diff --git a/.travis.yml b/.travis.yml
index 43693e0fde13579987b465dd59ce28b55ecefcdf..a34a295a70c7aab459368db2979a121ccc86e205 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -22,8 +22,8 @@ language: java
 jdk:
   - oraclejdk8
   - oraclejdk7
-  - openjdk7  
-  
+  - openjdk7
+
 install: mvn install -q -DskipTests=true
 
 script: mvn test -q
@@ -31,7 +31,8 @@ script: mvn test -q
 # Services to start for tests.
 services:
   - mongodb
+  - riak
 
 
 # Use the Container based infrastructure.
-sudo: false
+sudo: false
\ No newline at end of file
diff --git a/bin/ycsb b/bin/ycsb
index ef2c18598f99926a3131c104c155bb4bdb5be9bf..de2cdfd371d60fe52856f28f97b5c18e71d9139f 100755
--- a/bin/ycsb
+++ b/bin/ycsb
@@ -79,6 +79,7 @@ DATABASES = {
     "nosqldb"      : "com.yahoo.ycsb.db.NoSqlDbClient",
     "orientdb"     : "com.yahoo.ycsb.db.OrientDBClient",
     "redis"        : "com.yahoo.ycsb.db.RedisClient",
+    "riak"         : "com.yahoo.ycsb.db.riak.RiakKVClient",
     "s3"           : "com.yahoo.ycsb.db.S3Client",
     "solr"         : "com.yahoo.ycsb.db.SolrClient",
     "tarantool"    : "com.yahoo.ycsb.db.TarantoolClient",
diff --git a/distribution/pom.xml b/distribution/pom.xml
index a74d8f1b16aae89ac8eca282df8f30074a0ed643..6dc972f6214036d9fe446c298160abf1d1c8c6f7 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -149,6 +149,11 @@ LICENSE file.
       <artifactId>redis-binding</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>com.yahoo.ycsb</groupId>
+      <artifactId>riak-binding</artifactId>
+      <version>${project.version}</version>
+    </dependency>
     <dependency>
       <groupId>com.yahoo.ycsb</groupId>
       <artifactId>s3-binding</artifactId>
diff --git a/pom.xml b/pom.xml
index 8261e9b32d084fe0a1ff04ed06bf97604bf71fa9..71bdc2b7d7cbaf5b6aa274922599ec26aa62d8df 100644
--- a/pom.xml
+++ b/pom.xml
@@ -93,6 +93,7 @@ LICENSE file.
     <couchbase.version>1.4.10</couchbase.version>
     <couchbase2.version>2.2.6</couchbase2.version>
     <tarantool.version>1.6.5</tarantool.version>
+    <riak.version>2.0.5</riak.version>
     <aerospike.version>3.1.2</aerospike.version>
     <solr.version>5.4.0</solr.version>
   </properties>
@@ -127,6 +128,7 @@ LICENSE file.
     <module>nosqldb</module>
     <module>orientdb</module>
     <module>redis</module>
+    <module>riak</module>
     <module>s3</module>
     <module>solr</module>
     <module>tarantool</module>
diff --git a/riak/README.md b/riak/README.md
new file mode 100644
index 0000000000000000000000000000000000000000..d7160808affb90f60e2ddc0134aa9998dd7f897f
--- /dev/null
+++ b/riak/README.md
@@ -0,0 +1,72 @@
+<!--
+Copyright (c) 2016 YCSB contributors. All rights reserved.
+Copyright 2014 Basho Technologies, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License"); you
+may not use this file except in compliance with the License. You
+may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+implied. See the License for the specific language governing
+permissions and limitations under the License. See accompanying
+LICENSE file.
+-->
+
+Riak KV Client for Yahoo! Cloud System Benchmark (YCSB)
+=======================================================
+
+The Riak KV YCSB client is designed to work with the Yahoo! Cloud System Benchmark (YCSB) project (https://github.com/brianfrankcooper/YCSB) to support performance testing for the 2.x.y line of the Riak KV database.
+
+Creating a <i>bucket type</i> to use with YCSB
+----------------------------
+
+Perform the following operations on your Riak cluster to configure it for the benchmarks.
+
+Set the default backend for Riak to <i>LevelDB</i> in the `riak.conf` file of every node of your cluster. This is required to support <i>secondary indexes</i>, which are used for the `scan` transactions. You can do this by modifying the proper line as shown below.
+
+```
+storage_backend = leveldb
+```
+
+Now, create a bucket type named "ycsb"<sup id="a1">[1](#f1)</sup> by logging into one of the nodes in your cluster. Then, to use the <i>strong consistency model</i><sup id="a2">[2](#f2)</sup> (default), you need to follow the next two steps.
+
+1. In every `riak.conf` file, search for the `##strong_consistency=on` line and uncomment it. It's important that you do this <b>before you start your cluster</b>!
+2. Run the following `riak-admin` commands:
+
+  ```
+  riak-admin bucket-type create ycsb '{"props":{"consistent":true}}'
+  riak-admin bucket-type activate ycsb
+  ```
+
+Note that when using the strong consistency model, you **may have to specify the number of replicas to create for each object**. The *R* and *W* parameters (see next section) will in fact be ignored. The only information needed by this consistency model is how many nodes the system has to successfully query to consider a transaction completed. To set this parameter, you can add `"n_val":N` to the list of properties shown above (by default `N` is set to 3).
+
+If instead you want to use the <i>eventual consistency model</i> implemented in Riak, then type: 
+```
+riak-admin bucket-type create ycsb '{"props":{"allow_mult":"false"}}'
+riak-admin bucket-type activate ycsb
+```
+
+Riak KV configuration parameters
+----------------------------
+You can either specify these configuration parameters via command line or set them in the `riak.properties` file.
+
+* `riak.hosts` - <b>string list</b>, comma separated list of IPs or FQDNs. For example: `riak.hosts=127.0.0.1,127.0.0.2,127.0.0.3` or `riak.hosts=riak1.mydomain.com,riak2.mydomain.com,riak3.mydomain.com`.
+* `riak.port` - <b>int</b>, the port on which every node is listening. It must match the one specified in the `riak.conf` file at the line `listener.protobuf.internal`.
+* `riak.bucket_type` - <b>string</b>, it must match the name of the bucket type created during setup (see section above).
+* `riak.r_val` - <b>int</b>, this value represents the number of Riak nodes that must return results for a read operation before the transaction is considered successfully completed. 
+* `riak.w_val` - <b>int</b>, this value represents the number of Riak nodes that must report success before an insert/update transaction is considered complete.
+* `riak.read_retry_count` - <b>int</b>, the number of times the client will try to read a key from Riak.
+* `riak.wait_time_before_retry` - <b>int</b>, the time (in milliseconds) before the client attempts to perform another read if the previous one failed.
+* `riak.transaction_time_limit` - <b>int</b>, the time (in seconds) the client waits before aborting the current transaction.
+* `riak.strong_consistency` - <b>boolean</b>, indicates whether to use *strong consistency* (true) or *eventual consistency* (false).
+* `riak.debug` - <b>boolean</b>, enables debug mode. This displays all the properties (specified or defaults) when a benchmark is started. Moreover, it shows error causes whenever these occur.
+
+<b>Note</b>: For more information on workloads and how to run them please see: https://github.com/brianfrankcooper/YCSB/wiki/Running-a-Workload
+
+<b id="f1">1</b> As specified in the `riak.properties` file.  See parameters configuration section for further info. [↩](#a1)
+
+<b id="f2">2</b> <b>IMPORTANT NOTE:</b> Currently the `scan` transactions are <b>NOT SUPPORTED</b> for the benchmarks which use the strong consistency model! However this will not cause the benchmark to fail, it simply won't perform any scan transaction at all. These latter will immediately return with a `Status.NOT_IMPLEMENTED` code.  [↩](#a2)
diff --git a/riak/pom.xml b/riak/pom.xml
new file mode 100644
index 0000000000000000000000000000000000000000..da335adfd6a955a82a517229a84fd228d91cfab9
--- /dev/null
+++ b/riak/pom.xml
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Copyright (c) 2016 YCSB contributors. All rights reserved.
+Copyright 2014 Basho Technologies, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License"); you
+may not use this file except in compliance with the License. You
+may obtain a copy of the License at
+http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+implied. See the License for the specific language governing
+permissions and limitations under the License. See accompanying
+LICENSE file.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>com.yahoo.ycsb</groupId>
+    <artifactId>binding-parent</artifactId>
+    <version>0.9.0-SNAPSHOT</version>
+    <relativePath>../binding-parent</relativePath>
+  </parent>
+
+  <artifactId>riak-binding</artifactId>
+  <name>Riak KV Binding</name>
+  <packaging>jar</packaging>
+
+  <dependencies>
+    <dependency>
+      <groupId>com.basho.riak</groupId>
+      <artifactId>riak-client</artifactId>
+      <version>2.0.5</version>
+    </dependency>
+    <dependency>
+      <groupId>com.yahoo.ycsb</groupId>
+      <artifactId>core</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.google.collections</groupId>
+      <artifactId>google-collections</artifactId>
+      <version>1.0</version>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>4.12</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/riak/src/main/java/com/yahoo/ycsb/db/riak/RiakKVClient.java b/riak/src/main/java/com/yahoo/ycsb/db/riak/RiakKVClient.java
new file mode 100644
index 0000000000000000000000000000000000000000..152cd86c03948dea7c570ebc6d4f6d6de1eb661d
--- /dev/null
+++ b/riak/src/main/java/com/yahoo/ycsb/db/riak/RiakKVClient.java
@@ -0,0 +1,566 @@
+/**
+ * Copyright (c) 2016 YCSB contributors All rights reserved.
+ * Copyright 2014 Basho Technologies, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+package com.yahoo.ycsb.db.riak;
+
+import com.basho.riak.client.api.commands.buckets.StoreBucketProperties;
+import com.basho.riak.client.api.commands.kv.UpdateValue;
+import com.basho.riak.client.core.RiakFuture;
+import com.yahoo.ycsb.*;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import com.basho.riak.client.api.RiakClient;
+import com.basho.riak.client.api.cap.Quorum;
+import com.basho.riak.client.api.commands.indexes.IntIndexQuery;
+import com.basho.riak.client.api.commands.kv.DeleteValue;
+import com.basho.riak.client.api.commands.kv.FetchValue;
+import com.basho.riak.client.api.commands.kv.StoreValue;
+import com.basho.riak.client.api.commands.kv.StoreValue.Option;
+import com.basho.riak.client.core.RiakCluster;
+import com.basho.riak.client.core.RiakNode;
+import com.basho.riak.client.core.query.Location;
+import com.basho.riak.client.core.query.Namespace;
+import com.basho.riak.client.core.query.RiakObject;
+import com.basho.riak.client.core.query.indexes.LongIntIndex;
+import com.basho.riak.client.core.util.BinaryValue;
+
+import static com.yahoo.ycsb.db.riak.RiakUtils.deserializeTable;
+import static com.yahoo.ycsb.db.riak.RiakUtils.getKeyAsLong;
+import static com.yahoo.ycsb.db.riak.RiakUtils.serializeTable;
+
+/**
+ * Riak KV 2.x.y client for YCSB framework.
+ *
+ */
+public final class RiakKVClient extends DB {
+  private static final String HOST_PROPERTY = "riak.hosts";
+  private static final String PORT_PROPERTY = "riak.port";
+  private static final String BUCKET_TYPE_PROPERTY = "riak.bucket_type";
+  private static final String R_VALUE_PROPERTY = "riak.r_val";
+  private static final String W_VALUE_PROPERTY = "riak.w_val";
+  private static final String READ_RETRY_COUNT_PROPERTY = "riak.read_retry_count";
+  private static final String WAIT_TIME_BEFORE_RETRY_PROPERTY = "riak.wait_time_before_retry";
+  private static final String TRANSACTION_TIME_LIMIT_PROPERTY = "riak.transaction_time_limit";
+  private static final String STRONG_CONSISTENCY_PROPERTY = "riak.strong_consistency";
+  private static final String DEBUG_PROPERTY = "riak.debug";
+
+  private static final Status TIME_OUT = new Status("TIME_OUT", "Cluster didn't respond after maximum wait time.");
+
+  private String[] hosts;
+  private int port;
+  private String bucketType;
+  private Quorum rvalue;
+  private Quorum wvalue;
+  private int readRetryCount;
+  private int waitTimeBeforeRetry;
+  private int transactionTimeLimit;
+  private boolean strongConsistency;
+  private boolean debug;
+
+  private RiakClient riakClient;
+  private RiakCluster riakCluster;
+
+  private void loadDefaultProperties() {
+    InputStream propFile = RiakKVClient.class.getClassLoader().getResourceAsStream("riak.properties");
+    Properties propsPF = new Properties(System.getProperties());
+
+    try {
+      propsPF.load(propFile);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+
+    hosts = propsPF.getProperty(HOST_PROPERTY).split(",");
+    port = Integer.parseInt(propsPF.getProperty(PORT_PROPERTY));
+    bucketType = propsPF.getProperty(BUCKET_TYPE_PROPERTY);
+    rvalue = new Quorum(Integer.parseInt(propsPF.getProperty(R_VALUE_PROPERTY)));
+    wvalue = new Quorum(Integer.parseInt(propsPF.getProperty(W_VALUE_PROPERTY)));
+    readRetryCount = Integer.parseInt(propsPF.getProperty(READ_RETRY_COUNT_PROPERTY));
+    waitTimeBeforeRetry = Integer.parseInt(propsPF.getProperty(WAIT_TIME_BEFORE_RETRY_PROPERTY));
+    transactionTimeLimit = Integer.parseInt(propsPF.getProperty(TRANSACTION_TIME_LIMIT_PROPERTY));
+    strongConsistency = Boolean.parseBoolean(propsPF.getProperty(STRONG_CONSISTENCY_PROPERTY));
+    debug = Boolean.parseBoolean(propsPF.getProperty(DEBUG_PROPERTY));
+  }
+
+  private void loadProperties() {
+    loadDefaultProperties();
+
+    Properties props = getProperties();
+
+    String portString = props.getProperty(PORT_PROPERTY);
+    if (portString != null) {
+      port = Integer.parseInt(portString);
+    }
+
+    String hostsString = props.getProperty(HOST_PROPERTY);
+    if (hostsString != null) {
+      hosts = hostsString.split(",");
+    }
+
+    String bucketTypeString = props.getProperty(BUCKET_TYPE_PROPERTY);
+    if (bucketTypeString != null) {
+      bucketType = bucketTypeString;
+    }
+
+    String rValueString = props.getProperty(R_VALUE_PROPERTY);
+    if (rValueString != null) {
+      rvalue = new Quorum(Integer.parseInt(rValueString));
+    }
+
+    String wValueString = props.getProperty(W_VALUE_PROPERTY);
+    if (wValueString != null) {
+      wvalue = new Quorum(Integer.parseInt(wValueString));
+    }
+
+    String readRetryCountString = props.getProperty(READ_RETRY_COUNT_PROPERTY);
+    if (readRetryCountString != null) {
+      readRetryCount = Integer.parseInt(readRetryCountString);
+    }
+
+    String waitTimeBeforeRetryString = props.getProperty(WAIT_TIME_BEFORE_RETRY_PROPERTY);
+    if (waitTimeBeforeRetryString != null) {
+      waitTimeBeforeRetry = Integer.parseInt(waitTimeBeforeRetryString);
+    }
+
+    String transactionTimeLimitString = props.getProperty(TRANSACTION_TIME_LIMIT_PROPERTY);
+    if (transactionTimeLimitString != null) {
+      transactionTimeLimit = Integer.parseInt(transactionTimeLimitString);
+    }
+
+    String strongConsistencyString = props.getProperty(STRONG_CONSISTENCY_PROPERTY);
+    if (strongConsistencyString != null) {
+      strongConsistency = Boolean.parseBoolean(strongConsistencyString);
+    }
+
+    String debugString = props.getProperty(DEBUG_PROPERTY);
+    if (debugString != null) {
+      debug = Boolean.parseBoolean(debugString);
+    }
+  }
+
+  public void init() throws DBException {
+    loadProperties();
+
+    if (debug) {
+      System.err.println("DEBUG ENABLED. Configuration parameters:");
+      System.err.println("-----------------------------------------");
+      System.err.println("Hosts: " + Arrays.toString(hosts));
+      System.err.println("Port: " + port);
+      System.err.println("Bucket Type: " + bucketType);
+      System.err.println("R Val: " + rvalue.toString());
+      System.err.println("W Val: " + wvalue.toString());
+      System.err.println("Read Retry Count: " + readRetryCount);
+      System.err.println("Wait Time Before Retry: " + waitTimeBeforeRetry + " ms");
+      System.err.println("Transaction Time Limit: " + transactionTimeLimit + " s");
+      System.err.println("Consistency model: " + (strongConsistency ? "Strong" : "Eventual"));
+    }
+
+    RiakNode.Builder builder = new RiakNode.Builder().withRemotePort(port);
+    List<RiakNode> nodes = RiakNode.Builder.buildNodes(builder, Arrays.asList(hosts));
+    riakCluster = new RiakCluster.Builder(nodes).build();
+
+    try {
+      riakCluster.start();
+      riakClient = new RiakClient(riakCluster);
+    } catch (Exception e) {
+      System.err.println("Unable to properly start up the cluster. Reason: " + e.toString());
+      throw new DBException(e);
+    }
+  }
+
+  /**
+   * Read a record from the database. Each field/value pair from the result will be stored in a HashMap.
+   *
+   * @param table  The name of the table (Riak bucket)
+   * @param key    The record key of the record to read.
+   * @param fields The list of fields to read, or null for all of them
+   * @param result A HashMap of field/value pairs for the result
+   * @return Zero on success, a non-zero error code on error
+   */
+  @Override
+  public Status read(String table, String key, Set<String> fields, HashMap<String, ByteIterator> result) {
+    Location location = new Location(new Namespace(bucketType, table), key);
+    FetchValue fv = new FetchValue.Builder(location).withOption(FetchValue.Option.R, rvalue).build();
+    FetchValue.Response response;
+
+    try {
+      response = fetch(fv);
+
+      if (response.isNotFound()) {
+        if (debug) {
+          System.err.println("Unable to read key " + key + ". Reason: NOT FOUND");
+        }
+
+        return Status.NOT_FOUND;
+      }
+    } catch (TimeoutException e) {
+      if (debug) {
+        System.err.println("Unable to read key " + key + ". Reason: TIME OUT");
+      }
+
+      return TIME_OUT;
+    } catch (Exception e) {
+      if (debug) {
+        System.err.println("Unable to read key " + key + ". Reason: " + e.toString());
+      }
+
+      return Status.ERROR;
+    }
+
+    // Create the result HashMap.
+    createResultHashMap(fields, response, result);
+
+    return Status.OK;
+  }
+
+  /**
+   * Perform a range scan for a set of records in the database. Each field/value pair from the result will be stored in
+   * a HashMap.
+   * Note: The scan operation requires the use of secondary indexes (2i) and LevelDB.
+   * IMPORTANT NOTE: the 2i queries DO NOT WORK in conjunction with strong consistency (ref: http://docs.basho
+   * .com/riak/kv/2.1.4/developing/usage/secondary-indexes/)!
+   *
+   * @param table       The name of the table (Riak bucket)
+   * @param startkey    The record key of the first record to read.
+   * @param recordcount The number of records to read
+   * @param fields      The list of fields to read, or null for all of them
+   * @param result      A Vector of HashMaps, where each HashMap is a set field/value pairs for one record
+   * @return Zero on success, a non-zero error code on error
+   */
+  @Override
+  public Status scan(String table, String startkey, int recordcount, Set<String> fields,
+                     Vector<HashMap<String, ByteIterator>> result) {
+    // As of 2.1.4 Riak KV version, strong consistency does not support any suitable mean capable of searching
+    // consecutive stored keys, as requested by a scan transaction. So, the latter WILL NOT BE PERFORMED AT ALL!
+    // More info at http://docs.basho.com/riak/kv/2.1.4/developing/app-guide/strong-consistency/
+    if (strongConsistency) {
+      return Status.NOT_IMPLEMENTED;
+    }
+
+    Namespace ns = new Namespace(bucketType, table);
+
+    IntIndexQuery iiq = new IntIndexQuery
+        .Builder(ns, "key", getKeyAsLong(startkey), Long.MAX_VALUE)
+        .withMaxResults(recordcount)
+        .withPaginationSort(true)
+        .build();
+
+    RiakFuture<IntIndexQuery.Response, IntIndexQuery> future = riakClient.executeAsync(iiq);
+
+    try {
+      IntIndexQuery.Response response = future.get(transactionTimeLimit, TimeUnit.SECONDS);
+      List<IntIndexQuery.Response.Entry> entries = response.getEntries();
+
+      for (IntIndexQuery.Response.Entry entry : entries) {
+        Location location = entry.getRiakObjectLocation();
+
+        FetchValue fv = new FetchValue.Builder(location)
+            .withOption(FetchValue.Option.R, rvalue)
+            .build();
+
+        FetchValue.Response keyResponse = fetch(fv);
+
+        if (keyResponse.isNotFound()) {
+          if (debug) {
+            System.err.println("Unable to scan all records starting from key " + startkey + ", aborting transaction. " +
+                "Reason: NOT FOUND");
+          }
+
+          return Status.NOT_FOUND;
+        }
+
+        HashMap<String, ByteIterator> partialResult = new HashMap<>();
+        createResultHashMap(fields, keyResponse, partialResult);
+        result.add(partialResult);
+      }
+    } catch (TimeoutException e) {
+      if (debug) {
+        System.err.println("Unable to scan all records starting from key " + startkey + ", aborting transaction. " +
+            "Reason: TIME OUT");
+      }
+
+      return TIME_OUT;
+    } catch (Exception e) {
+      if (debug) {
+        System.err.println("Unable to scan all records starting from key " + startkey + ", aborting transaction. " +
+            "Reason: " + e.toString());
+      }
+
+      return Status.ERROR;
+    }
+
+    return Status.OK;
+  }
+
+  /**
+   * Tries to perform a read and, whenever it fails, retries to do it. It actually does try as many time as indicated,
+   * even if the function riakClient.execute(fv) throws an exception. This is needed for those situation in which the
+   * cluster is unable to respond properly due to overload. Note however that if the cluster doesn't respond after
+   * transactionTimeLimit, the transaction is discarded immediately.
+   *
+   * @param fv The value to fetch from the cluster.
+   */
+  private FetchValue.Response fetch(FetchValue fv) throws TimeoutException {
+    FetchValue.Response response = null;
+
+    for (int i = 0; i < readRetryCount; i++) {
+      RiakFuture<FetchValue.Response, Location> future = riakClient.executeAsync(fv);
+
+      try {
+        response = future.get(transactionTimeLimit, TimeUnit.SECONDS);
+
+        if (!response.isNotFound()) {
+          break;
+        }
+      } catch (TimeoutException e) {
+        // Let the callee decide how to handle this exception...
+        throw new TimeoutException();
+      } catch (Exception e) {
+        // Sleep for a few ms before retrying...
+        try {
+          Thread.sleep(waitTimeBeforeRetry);
+        } catch (InterruptedException e1) {
+          e1.printStackTrace();
+        }
+      }
+    }
+
+    return response;
+  }
+
+  /**
+   * Function that retrieves all the fields searched within a read or scan operation and puts them in the result
+   * HashMap.
+   *
+   * @param fields        The list of fields to read, or null for all of them.
+   * @param response      A Vector of HashMaps, where each HashMap is a set field/value pairs for one record.
+   * @param resultHashMap The HashMap to return as result.
+   */
+  private void createResultHashMap(Set<String> fields, FetchValue.Response response, HashMap<String, ByteIterator>
+      resultHashMap) {
+    // If everything went fine, then a result must be given. Such an object is a hash table containing the (field,
+    // value) pairs based on the requested fields. Note that in a read operation, ONLY ONE OBJECT IS RETRIEVED!
+    // The following line retrieves the previously serialized table which was store with an insert transaction.
+    byte[] responseFieldsAndValues = response.getValues().get(0).getValue().getValue();
+
+    // Deserialize the stored response table.
+    HashMap<String, ByteIterator> deserializedTable = new HashMap<>();
+    deserializeTable(responseFieldsAndValues, deserializedTable);
+
+    // If only specific fields are requested, then only these should be put in the result object!
+    if (fields != null) {
+      // Populate the HashMap to provide as result.
+      for (Object field : fields.toArray()) {
+        // Comparison between a requested field and the ones retrieved. If they're equal (i.e. the get() operation
+        // DOES NOT return a null value), then  proceed to store the pair in the resultHashMap.
+        ByteIterator value = deserializedTable.get(field);
+
+        if (value != null) {
+          resultHashMap.put((String) field, value);
+        }
+      }
+    } else {
+      // If, instead, no field is specified, then all the ones retrieved must be provided as result.
+      for (String field : deserializedTable.keySet()) {
+        resultHashMap.put(field, deserializedTable.get(field));
+      }
+    }
+  }
+
+  /**
+   * Insert a record in the database. Any field/value pairs in the specified values HashMap will be written into the
+   * record with the specified record key. Also creates a secondary index (2i) for each record consisting of the key
+   * converted to long to be used for the scan operation.
+   *
+   * @param table  The name of the table (Riak bucket)
+   * @param key    The record key of the record to insert.
+   * @param values A HashMap of field/value pairs to insert in the record
+   * @return Zero on success, a non-zero error code on error
+   */
+  @Override
+  public Status insert(String table, String key, HashMap<String, ByteIterator> values) {
+    Location location = new Location(new Namespace(bucketType, table), key);
+    RiakObject object = new RiakObject();
+
+    object.setValue(BinaryValue.create(serializeTable(values)));
+    object.getIndexes().getIndex(LongIntIndex.named("key_int")).add(getKeyAsLong(key));
+
+    StoreValue store = new StoreValue.Builder(object)
+        .withLocation(location)
+        .withOption(Option.W, wvalue)
+        .build();
+
+    RiakFuture<StoreValue.Response, Location> future = riakClient.executeAsync(store);
+
+    try {
+      future.get(transactionTimeLimit, TimeUnit.SECONDS);
+    } catch (TimeoutException e) {
+      if (debug) {
+        System.err.println("Unable to " + (Thread.currentThread().getStackTrace()[2]
+            .getMethodName().equals("update") ? "update" : "insert") + " key " + key + ". Reason: TIME OUT");
+      }
+
+      return TIME_OUT;
+    } catch (Exception e) {
+      if (debug) {
+        System.err.println("Unable to " + (Thread.currentThread().getStackTrace()[2]
+            .getMethodName().equals("update") ? "update" : "insert") + " key " + key + ". Reason: " + e.toString());
+      }
+
+      return Status.ERROR;
+    }
+
+    return Status.OK;
+  }
+
+  /**
+   * Auxiliary class needed for object substitution within the update operation. It is a fundamental part of the
+   * fetch-update (locally)-store cycle described by Basho to properly perform a strong-consistent update.
+   */
+  private static final class UpdateEntity extends UpdateValue.Update<RiakObject> {
+    private final RiakObject object;
+
+    private UpdateEntity(RiakObject object) {
+      this.object = object;
+    }
+
+    //Simply returns the object.
+    @Override
+    public RiakObject apply(RiakObject original) {
+      return object;
+    }
+  }
+
+  /**
+   * Update a record in the database. Any field/value pairs in the specified values HashMap will be written into the
+   * record with the specified record key, overwriting any existing values with the same field name.
+   *
+   * @param table  The name of the table (Riak bucket)
+   * @param key    The record key of the record to write.
+   * @param values A HashMap of field/value pairs to update in the record
+   * @return Zero on success, a non-zero error code on error
+   */
+  @Override
+  public Status update(String table, String key, HashMap<String, ByteIterator> values) {
+    if (!strongConsistency) {
+      return insert(table, key, values);
+    }
+
+    Location location = new Location(new Namespace(bucketType, table), key);
+    RiakObject object = new RiakObject();
+
+    object.setValue(BinaryValue.create(serializeTable(values)));
+    object.getIndexes().getIndex(LongIntIndex.named("key_int")).add(getKeyAsLong(key));
+
+    UpdateValue update = new UpdateValue.Builder(location)
+        .withUpdate(new UpdateEntity(object))
+        .build();
+
+    RiakFuture<UpdateValue.Response, Location> future = riakClient.executeAsync(update);
+
+    try {
+      // For some reason, the update transaction doesn't throw any exception when no cluster has been started, so one
+      // needs to check whether it was done or not. When calling the wasUpdated() function with no nodes available, a
+      // NullPointerException is thrown.
+      // Moreover, such exception could be thrown when more threads are trying to update the same key or, more
+      // generally, when the system is being queried by many clients (i.e. overloaded). This is a known limitation of
+      // Riak KV's strong consistency implementation.
+      future.get(transactionTimeLimit, TimeUnit.SECONDS).wasUpdated();
+    } catch (TimeoutException e) {
+      if (debug) {
+        System.err.println("Unable to update key " + key + ". Reason: TIME OUT");
+      }
+
+      return TIME_OUT;
+    } catch (Exception e) {
+      if (debug) {
+        System.err.println("Unable to update key " + key + ". Reason: " + e.toString());
+      }
+
+      return Status.ERROR;
+    }
+
+    return Status.OK;
+  }
+
+
+  /**
+   * Delete a record from the database.
+   *
+   * @param table The name of the table (Riak bucket)
+   * @param key   The record key of the record to delete.
+   * @return Zero on success, a non-zero error code on error
+   */
+  @Override
+  public Status delete(String table, String key) {
+    Location location = new Location(new Namespace(bucketType, table), key);
+    DeleteValue dv = new DeleteValue.Builder(location).build();
+
+    RiakFuture<Void, Location> future = riakClient.executeAsync(dv);
+
+    try {
+      future.get(transactionTimeLimit, TimeUnit.SECONDS);
+    } catch (TimeoutException e) {
+      if (debug) {
+        System.err.println("Unable to delete key " + key + ". Reason: TIME OUT");
+      }
+
+      return TIME_OUT;
+    } catch (Exception e) {
+      if (debug) {
+        System.err.println("Unable to delete key " + key + ". Reason: " + e.toString());
+      }
+
+      return Status.ERROR;
+    }
+
+    return Status.OK;
+  }
+
+  public void cleanup() throws DBException {
+    try {
+      riakCluster.shutdown();
+    } catch (Exception e) {
+      System.err.println("Unable to properly shutdown the cluster. Reason: " + e.toString());
+      throw new DBException(e);
+    }
+  }
+
+  /**
+   * Auxiliary function needed for testing. It configures the default bucket-type to take care of the consistency
+   * problem by disallowing the siblings creation. Moreover, it disables strong consistency, as the scan transaction
+   * test would otherwise fail.
+   *
+   * @param bucket     The bucket name.
+   * @throws Exception Thrown if something bad happens.
+     */
+  void setTestEnvironment(String bucket) throws Exception {
+    bucketType = "default";
+    strongConsistency = false;
+
+    Namespace ns = new Namespace(bucketType, bucket);
+    StoreBucketProperties newBucketProperties = new StoreBucketProperties.Builder(ns).withAllowMulti(false).build();
+
+    riakClient.execute(newBucketProperties);
+  }
+}
diff --git a/riak/src/main/java/com/yahoo/ycsb/db/riak/RiakUtils.java b/riak/src/main/java/com/yahoo/ycsb/db/riak/RiakUtils.java
new file mode 100644
index 0000000000000000000000000000000000000000..e4471541c8ba2e225bacd4a43113896f4156e7a6
--- /dev/null
+++ b/riak/src/main/java/com/yahoo/ycsb/db/riak/RiakUtils.java
@@ -0,0 +1,147 @@
+/**
+ * Copyright (c) 2016 YCSB contributors All rights reserved.
+ * Copyright 2014 Basho Technologies, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+package com.yahoo.ycsb.db.riak;
+
+import java.io.*;
+import java.util.Map;
+import java.util.Set;
+
+import com.yahoo.ycsb.ByteArrayByteIterator;
+import com.yahoo.ycsb.ByteIterator;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * Utility class for Riak KV Client.
+ *
+ */
+final class RiakUtils {
+
+  private RiakUtils() {
+    super();
+  }
+
+  private static byte[] toBytes(final int anInteger) {
+    byte[] aResult = new byte[4];
+
+    aResult[0] = (byte) (anInteger >> 24);
+    aResult[1] = (byte) (anInteger >> 16);
+    aResult[2] = (byte) (anInteger >> 8);
+    aResult[3] = (byte) (anInteger /* >> 0 */);
+
+    return aResult;
+  }
+
+  private static int fromBytes(final byte[] aByteArray) {
+    checkArgument(aByteArray.length == 4);
+
+    return (aByteArray[0] << 24) | (aByteArray[1] & 0xFF) << 16 | (aByteArray[2] & 0xFF) << 8 | (aByteArray[3] & 0xFF);
+  }
+
+  private static void close(final OutputStream anOutputStream) {
+    try {
+      anOutputStream.close();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  private static void close(final InputStream anInputStream) {
+    try {
+      anInputStream.close();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  /**
+   * Serializes a Map, transforming the contained list of (String, ByteIterator) couples into a byte array.
+   *
+   * @param aTable A Map to serialize.
+   * @return A byte array containng the serialized table.
+     */
+  static byte[] serializeTable(Map<String, ByteIterator> aTable) {
+    final ByteArrayOutputStream anOutputStream = new ByteArrayOutputStream();
+    final Set<Map.Entry<String, ByteIterator>> theEntries = aTable.entrySet();
+
+    try {
+      for (final Map.Entry<String, ByteIterator> anEntry : theEntries) {
+        final byte[] aColumnName = anEntry.getKey().getBytes();
+
+        anOutputStream.write(toBytes(aColumnName.length));
+        anOutputStream.write(aColumnName);
+
+        final byte[] aColumnValue = anEntry.getValue().toArray();
+
+        anOutputStream.write(toBytes(aColumnValue.length));
+        anOutputStream.write(aColumnValue);
+      }
+      return anOutputStream.toByteArray();
+    } catch (IOException e) {
+      throw new IllegalStateException(e);
+    } finally {
+      close(anOutputStream);
+    }
+  }
+
+  /**
+   * Deserializes an input byte array, transforming it into a list of (String, ByteIterator) couples (i.e. a Map).
+   *
+   * @param aValue    A byte array containing the table to deserialize.
+   * @param theResult A Map containing the deserialized table.
+     */
+  static void deserializeTable(final byte[] aValue, final Map<String, ByteIterator> theResult) {
+    final ByteArrayInputStream anInputStream = new ByteArrayInputStream(aValue);
+    byte[] aSizeBuffer = new byte[4];
+
+    try {
+      while (anInputStream.available() > 0) {
+        anInputStream.read(aSizeBuffer);
+        final int aColumnNameLength = fromBytes(aSizeBuffer);
+
+        final byte[] aColumnNameBuffer = new byte[aColumnNameLength];
+        anInputStream.read(aColumnNameBuffer);
+
+        anInputStream.read(aSizeBuffer);
+        final int aColumnValueLength = fromBytes(aSizeBuffer);
+
+        final byte[] aColumnValue = new byte[aColumnValueLength];
+        anInputStream.read(aColumnValue);
+
+        theResult.put(new String(aColumnNameBuffer), new ByteArrayByteIterator(aColumnValue));
+      }
+    } catch (Exception e) {
+      throw new IllegalStateException(e);
+    } finally {
+      close(anInputStream);
+    }
+  }
+
+  /**
+   * Obtains a Long number from a key string. This will be the key used by Riak for all the transactions.
+   *
+   * @param key The key to convert from String to Long.
+   * @return A Long number parsed from the key String.
+     */
+  static Long getKeyAsLong(String key) {
+    String keyString = key.replaceFirst("[a-zA-Z]*", "");
+
+    return Long.parseLong(keyString);
+  }
+}
diff --git a/riak/src/main/java/com/yahoo/ycsb/db/riak/package-info.java b/riak/src/main/java/com/yahoo/ycsb/db/riak/package-info.java
new file mode 100644
index 0000000000000000000000000000000000000000..32d163fdcf7cc0d3b7134e382caf673d593e54b2
--- /dev/null
+++ b/riak/src/main/java/com/yahoo/ycsb/db/riak/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Copyright (c) 2016 YCSB contributors All rights reserved.
+ * Copyright 2014 Basho Technologies, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+/**
+ * The YCSB binding for <a href="http://basho.com/products/riak-kv/">Riak KV</a> 2.x.y.
+ *
+ */
+package com.yahoo.ycsb.db.riak;
\ No newline at end of file
diff --git a/riak/src/main/resources/riak.properties b/riak/src/main/resources/riak.properties
new file mode 100644
index 0000000000000000000000000000000000000000..6e418848808a6acb852503060ef23889cde927fc
--- /dev/null
+++ b/riak/src/main/resources/riak.properties
@@ -0,0 +1,57 @@
+##
+# Copyright (c) 2016 YCSB contributors All rights reserved.
+# Copyright 2014 Basho Technologies, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you
+# may not use this file except in compliance with the License. You
+# may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied. See the License for the specific language governing
+# permissions and limitations under the License. See accompanying
+# LICENSE file.
+#
+
+# RiakKVClient - Default Properties
+# Note: Change the properties below to set the values to use for your test. You can set them either here or from the
+# command line. Note that the latter choice overrides these settings.
+
+# riak.hosts - string list, comma separated list of IPs or FQDNs.
+# EX: 127.0.0.1,127.0.0.2,127.0.0.3 or riak1.mydomain.com,riak2.mydomain.com,riak3.mydomain.com
+riak.hosts=127.0.0.1
+
+# riak.port - int, the port on which every node is listening. It must match the one specified in the riak.conf file
+# at the line "listener.protobuf.internal".
+riak.port=8087
+
+# riak.bucket_type - string, must match value of bucket type created during setup. See readme.md for more information
+riak.bucket_type=ycsb
+
+# riak.r_val - int, the R value represents the number of Riak nodes that must return results for a read before the read
+# is considered successful.
+riak.r_val=2
+
+# riak.w_val - int, the W value represents the number of Riak nodes that must report success before an update is
+# considered complete.
+riak.w_val=2
+
+# riak.read_retry_count - int, number of times the client will try to read a key from Riak.
+riak.read_retry_count=5
+
+# riak.wait_time_before_retry - int, time (in milliseconds) the client waits before attempting to perform another
+# read if the previous one failed.
+riak.wait_time_before_retry=200
+
+# riak.transaction_time_limit - int, time (in seconds) the client waits before aborting the current transaction.
+riak.transaction_time_limit=10
+
+# riak.strong_consistency - boolean, indicates whether to use strong consistency (true) or eventual consistency (false).
+riak.strong_consistency=true
+
+# riak.debug - boolean, enables debug mode. This displays all the properties (specified or defaults) when a benchmark
+# is started.
+riak.debug=false
\ No newline at end of file
diff --git a/riak/src/test/java/com/yahoo/ycsb/db/riak/RiakKVClientTest.java b/riak/src/test/java/com/yahoo/ycsb/db/riak/RiakKVClientTest.java
new file mode 100644
index 0000000000000000000000000000000000000000..a571fe43242601a653227f0e1fbc255780e626ef
--- /dev/null
+++ b/riak/src/test/java/com/yahoo/ycsb/db/riak/RiakKVClientTest.java
@@ -0,0 +1,264 @@
+/**
+ * Copyright (c) 2016 YCSB contributors All rights reserved.
+ * Copyright 2014 Basho Technologies, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+package com.yahoo.ycsb.db.riak;
+
+import java.util.*;
+
+import com.yahoo.ycsb.ByteIterator;
+import com.yahoo.ycsb.Status;
+import com.yahoo.ycsb.StringByteIterator;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assume.assumeNoException;
+import static org.junit.Assume.assumeThat;
+
+/**
+ * Integration tests for the Riak KV client.
+ */
+public class RiakKVClientTest {
+  private static RiakKVClient riakClient;
+
+  private static final String bucket = "testBucket";
+  private static final String keyPrefix = "testKey";
+  private static final int recordsToInsert = 20;
+  private static final int recordsToScan = 7;
+  private static final String firstField = "Key number";
+  private static final String secondField = "Key number doubled";
+  private static final String thirdField = "Key number square";
+
+  private static boolean testStarted = false;
+
+  /**
+   * Creates a cluster for testing purposes.
+   */
+  @BeforeClass
+  public static void setUpClass() throws Exception {
+    riakClient = new RiakKVClient();
+    riakClient.init();
+
+    // Set the test bucket environment with the appropriate parameters.
+    try {
+      riakClient.setTestEnvironment(bucket);
+    } catch(Exception e) {
+      assumeNoException("Unable to configure Riak KV for test, aborting.", e);
+    }
+
+    // Just add some records to work on...
+    for (int i = 0; i < recordsToInsert; i++) {
+      // Abort the entire test whenever the dataset population operation fails.
+      assumeThat("Riak KV is NOT RUNNING, aborting test.",
+          riakClient.insert(bucket, keyPrefix + String.valueOf(i), StringByteIterator.getByteIteratorMap(
+              createExpectedHashMap(i))),
+              is(Status.OK));
+    }
+
+    // Variable to check to determine whether the test has started or not.
+    testStarted = true;
+  }
+
+  /**
+   * Shuts down the cluster created.
+   */
+  @AfterClass
+  public static void tearDownClass() throws Exception {
+    // Delete all added keys before cleanup ONLY IF TEST ACTUALLY STARTED.
+    if (testStarted) {
+      for (int i = 0; i <= recordsToInsert; i++) {
+        delete(keyPrefix + Integer.toString(i));
+      }
+    }
+
+    riakClient.cleanup();
+  }
+
+  /**
+   * Test method for read transaction. It is designed to read two of the three fields stored for each key, to also test
+   * if the createResultHashMap() function implemented in RiakKVClient.java works as expected.
+   */
+  @Test
+  public void testRead() {
+    // Choose a random key to read, among the available ones.
+    int readKeyNumber = new Random().nextInt(recordsToInsert);
+
+    // Prepare two fields to read.
+    Set<String> fields = new HashSet<>();
+    fields.add(firstField);
+    fields.add(thirdField);
+
+    // Prepare an expected result.
+    HashMap<String, String> expectedValue = new HashMap<>();
+    expectedValue.put(firstField, Integer.toString(readKeyNumber));
+    expectedValue.put(thirdField, Integer.toString(readKeyNumber * readKeyNumber));
+
+    // Define a HashMap to store the actual result.
+    HashMap<String, ByteIterator> readValue = new HashMap<>();
+
+    // If a read transaction has been properly done, then one has to receive a Status.OK return from the read()
+    // function. Moreover, the actual returned result MUST match the expected one.
+    assertEquals("Read transaction FAILED.",
+        Status.OK,
+        riakClient.read(bucket, keyPrefix + Integer.toString(readKeyNumber), fields, readValue));
+
+    assertEquals("Read test FAILED. Actual read transaction value is NOT MATCHING the expected one.",
+        expectedValue.toString(),
+        readValue.toString());
+  }
+
+  /**
+   * Test method for scan transaction. A scan transaction has to be considered successfully completed only if all the
+   * requested values are read (i.e. scan transaction returns with Status.OK). Moreover, one has to check if the
+   * obtained results match the expected ones.
+   */
+  @Test
+  public void testScan() {
+    // Choose, among the available ones, a random key as starting point for the scan transaction.
+    int startScanKeyNumber = new Random().nextInt(recordsToInsert - recordsToScan);
+
+    // Prepare a HashMap vector to store the scan transaction results.
+    Vector<HashMap<String, ByteIterator>> scannedValues = new Vector<>();
+
+    // Check whether the scan transaction is correctly performed or not.
+    assertEquals("Scan transaction FAILED.",
+        Status.OK,
+        riakClient.scan(bucket, keyPrefix + Integer.toString(startScanKeyNumber), recordsToScan, null,
+            scannedValues));
+
+    // After the scan transaction completes, compare the obtained results with the expected ones.
+    for (int i = 0; i < recordsToScan; i++) {
+      assertEquals("Scan test FAILED: the current scanned key is NOT MATCHING the expected one.",
+          createExpectedHashMap(startScanKeyNumber + i).toString(),
+          scannedValues.get(i).toString());
+    }
+  }
+
+  /**
+   * Test method for update transaction. The test is designed to restore the previously read key. It is assumed to be
+   * correct when, after performing the update transaction, one reads the just provided values.
+   */
+  @Test
+  public void testUpdate() {
+    // Choose a random key to read, among the available ones.
+    int updateKeyNumber = new Random().nextInt(recordsToInsert);
+
+    // Define a HashMap to save the previously stored values for eventually restoring them.
+    HashMap<String, ByteIterator> readValueBeforeUpdate = new HashMap<>();
+    riakClient.read(bucket, keyPrefix + Integer.toString(updateKeyNumber), null, readValueBeforeUpdate);
+
+    // Prepare an update HashMap to store.
+    HashMap<String, String> updateValue = new HashMap<>();
+    updateValue.put(firstField, "UPDATED");
+    updateValue.put(secondField, "UPDATED");
+    updateValue.put(thirdField, "UPDATED");
+
+    // First of all, perform the update and check whether it's failed or not.
+    assertEquals("Update transaction FAILED.",
+        Status.OK,
+        riakClient.update(bucket, keyPrefix + Integer.toString(updateKeyNumber), StringByteIterator
+            .getByteIteratorMap(updateValue)));
+
+    // Then, read the key again and...
+    HashMap<String, ByteIterator> readValueAfterUpdate = new HashMap<>();
+    assertEquals("Update test FAILED. Unable to read key value.",
+        Status.OK,
+        riakClient.read(bucket, keyPrefix + Integer.toString(updateKeyNumber), null, readValueAfterUpdate));
+
+    // ...compare the result with the new one!
+    assertEquals("Update transaction NOT EXECUTED PROPERLY. Values DID NOT CHANGE.",
+        updateValue.toString(),
+        readValueAfterUpdate.toString());
+
+    // Finally, restore the previously read key.
+    assertEquals("Update test FAILED. Unable to restore previous key value.",
+        Status.OK,
+        riakClient.update(bucket, keyPrefix + Integer.toString(updateKeyNumber), readValueBeforeUpdate));
+  }
+
+  /**
+   * Test method for insert transaction. It is designed to insert a key just after the last key inserted in the setUp()
+   * phase.
+   */
+  @Test
+  public void testInsert() {
+    // Define a HashMap to insert and another one for the comparison operation.
+    HashMap<String, String> insertValue = createExpectedHashMap(recordsToInsert);
+    HashMap<String, ByteIterator> readValue = new HashMap<>();
+
+    // Check whether the insertion transaction was performed or not.
+    assertEquals("Insert transaction FAILED.",
+        Status.OK,
+        riakClient.insert(bucket, keyPrefix + Integer.toString(recordsToInsert), StringByteIterator.
+            getByteIteratorMap(insertValue)));
+
+    // Finally, compare the insertion performed with the one expected by reading the key.
+    assertEquals("Insert test FAILED. Unable to read inserted value.",
+        Status.OK,
+        riakClient.read(bucket, keyPrefix + Integer.toString(recordsToInsert), null, readValue));
+    assertEquals("Insert test FAILED. Actual read transaction value is NOT MATCHING the inserted one.",
+        insertValue.toString(),
+        readValue.toString());
+  }
+
+  /**
+   * Test method for delete transaction. The test deletes a key, then performs a read that should give a
+   * Status.NOT_FOUND response. Finally, it restores the previously read key.
+   */
+  @Test
+  public void testDelete() {
+    // Choose a random key to delete, among the available ones.
+    int deleteKeyNumber = new Random().nextInt(recordsToInsert);
+
+    // Define a HashMap to save the previously stored values for its eventual restore.
+    HashMap<String, ByteIterator> readValueBeforeDelete = new HashMap<>();
+    riakClient.read(bucket, keyPrefix + Integer.toString(deleteKeyNumber), null, readValueBeforeDelete);
+
+    // First of all, delete the key.
+    assertEquals("Delete transaction FAILED.",
+        Status.OK,
+        delete(keyPrefix + Integer.toString(deleteKeyNumber)));
+
+    // Then, check if the deletion was actually achieved.
+    assertEquals("Delete test FAILED. Key NOT deleted.",
+        Status.NOT_FOUND,
+        riakClient.read(bucket, keyPrefix + Integer.toString(deleteKeyNumber), null, null));
+
+    // Finally, restore the previously deleted key.
+    assertEquals("Delete test FAILED. Unable to restore previous key value.",
+        Status.OK,
+        riakClient.insert(bucket, keyPrefix + Integer.toString(deleteKeyNumber), readValueBeforeDelete));
+  }
+
+  private static Status delete(String key) {
+    return riakClient.delete(bucket, key);
+  }
+
+  private static HashMap<String, String> createExpectedHashMap(int value) {
+    HashMap<String, String> values = new HashMap<>();
+
+    values.put(firstField, Integer.toString(value));
+    values.put(secondField, Integer.toString(2 * value));
+    values.put(thirdField, Integer.toString(value * value));
+
+    return values;
+  }
+}