diff --git a/aerospike/README.md b/aerospike/README.md new file mode 100644 index 0000000000000000000000000000000000000000..c478916098b536b1e4902891c825dfb6c7817bd7 --- /dev/null +++ b/aerospike/README.md @@ -0,0 +1,41 @@ +## Quick Start + +This section describes how to run YCSB on Aerospike. + +### 1. Start Aerospike + +### 2. Install Java and Maven + +### 3. Set Up YCSB + +Git clone YCSB and compile: + + git clone http://github.com/brianfrankcooper/YCSB.git + cd YCSB + mvn -pl com.yahoo.ycsb:aerospike-binding -am clean package + +### 4. Provide Aerospike Connection Parameters + +The following connection parameters are available. + + * `as.host` - The Aerospike cluster to connect to (default: `localhost`) + * `as.port` - The port to connect to (default: `3000`) + * `as.user` - The user to connect as (no default) + * `as.password` - The password for the user (no default) + * `as.timeout` - The transaction and connection timeout (in ms, default: `1000`) + * `as.namespace` - The namespace to be used for the benchmark (default: `ycsb`) + +Add them to the workload or set them with the shell command, as in: + + ./bin/ycsb load aerospike -s -P workloads/workloada -p as.timeout=5000 >outputLoad.txt + +### 5. Load Data and Run Tests + +Load the data: + + ./bin/ycsb load aerospike -s -P workloads/workloada >outputLoad.txt + +Run the workload test: + + ./bin/ycsb run aerospike -s -P workloads/workloada >outputRun.txt + diff --git a/aerospike/pom.xml b/aerospike/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..a9e7f208e28fc4ff616e014c19dac6cdc5f79173 --- /dev/null +++ b/aerospike/pom.xml @@ -0,0 +1,28 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>com.yahoo.ycsb</groupId> + <artifactId>binding-parent</artifactId> + <version>0.3.0-SNAPSHOT</version> + <relativePath>../binding-parent</relativePath> + </parent> + + <artifactId>aerospike-binding</artifactId> + <name>Aerospike DB Binding</name> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>com.aerospike</groupId> + <artifactId>aerospike-client</artifactId> + <version>${aerospike.version}</version> + </dependency> + <dependency> + <groupId>com.yahoo.ycsb</groupId> + <artifactId>core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + </dependencies> +</project> diff --git a/aerospike/src/main/java/com/yahoo/ycsb/db/AerospikeClient.java b/aerospike/src/main/java/com/yahoo/ycsb/db/AerospikeClient.java new file mode 100644 index 0000000000000000000000000000000000000000..f48523b3beec8a014c51195f730bb46a0d0e4010 --- /dev/null +++ b/aerospike/src/main/java/com/yahoo/ycsb/db/AerospikeClient.java @@ -0,0 +1,205 @@ +package com.yahoo.ycsb.db; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.Vector; + +import com.aerospike.client.AerospikeException; +import com.aerospike.client.Bin; +import com.aerospike.client.Key; +import com.aerospike.client.Record; +import com.aerospike.client.ResultCode; +import com.aerospike.client.policy.ClientPolicy; +import com.aerospike.client.policy.Policy; +import com.aerospike.client.policy.RecordExistsAction; +import com.aerospike.client.policy.WritePolicy; + +import com.yahoo.ycsb.ByteArrayByteIterator; +import com.yahoo.ycsb.ByteIterator; +import com.yahoo.ycsb.DBException; + +public class AerospikeClient extends com.yahoo.ycsb.DB { + private static final boolean DEBUG = false; + + private static final String DEFAULT_HOST = "localhost"; + private static final String DEFAULT_PORT = "3000"; + private static final String DEFAULT_TIMEOUT = "10000"; + private static final String DEFAULT_NAMESPACE = "ycsb"; + + private static final int RESULT_OK = 0; + private static final int RESULT_ERROR = 1; + + private static final int WRITE_OVERLOAD_DELAY = 5; + private static final int WRITE_OVERLOAD_TRIES = 3; + + private String namespace = null; + + private com.aerospike.client.AerospikeClient client = null; + private int writeOverloadTries = WRITE_OVERLOAD_TRIES; + + private Policy readPolicy = new Policy(); + private WritePolicy insertPolicy = new WritePolicy(); + private WritePolicy updatePolicy = new WritePolicy(); + private WritePolicy deletePolicy = new WritePolicy(); + + public void init() throws DBException { + insertPolicy.recordExistsAction = RecordExistsAction.CREATE_ONLY; + updatePolicy.recordExistsAction = RecordExistsAction.UPDATE_ONLY; + + Properties props = getProperties(); + + namespace = props.getProperty("as.namespace", DEFAULT_NAMESPACE); + + String host = props.getProperty("as.host", DEFAULT_HOST); + String user = props.getProperty("as.user"); + String password = props.getProperty("as.password"); + int port = Integer.parseInt(props.getProperty("as.port", DEFAULT_PORT)); + int timeout = Integer.parseInt(props.getProperty("as.timeout", + DEFAULT_TIMEOUT)); + + readPolicy.timeout = timeout; + insertPolicy.timeout = timeout; + updatePolicy.timeout = timeout; + deletePolicy.timeout = timeout; + + ClientPolicy clientPolicy = new ClientPolicy(); + + if (user != null && password != null) { + clientPolicy.user = user; + clientPolicy.password = password; + } + + try { + client = + new com.aerospike.client.AerospikeClient(clientPolicy, host, port); + } catch (AerospikeException e) { + throw new DBException(String.format("Error while creating Aerospike " + + "client for %s:%d.", host, port)); + } + } + + public void cleanup() throws DBException { + client.close(); + } + + @Override + public int read(String table, String key, Set<String> fields, + HashMap<String, ByteIterator> result) { + try { + Record record; + + if (fields != null) { + record = client.get(readPolicy, new Key(namespace, table, key), + fields.toArray(new String[fields.size()])); + } else { + record = client.get(readPolicy, new Key(namespace, table, key)); + } + + if (record == null) { + if (DEBUG) { + System.err.println("Record key " + key + " not found (read)"); + } + + return RESULT_ERROR; + } + + for (Map.Entry<String, Object> entry: record.bins.entrySet()) { + result.put(entry.getKey(), + new ByteArrayByteIterator((byte[])entry.getValue())); + } + + return RESULT_OK; + } catch (AerospikeException e) { + System.err.println("Error while reading key " + key + ": " + e); + return RESULT_ERROR; + } + } + + @Override + public int scan(String table, String start, int count, Set<String> fields, + Vector<HashMap<String, ByteIterator>> result) { + System.err.println("Scan not implemented"); + return RESULT_ERROR; + } + + private int write(String table, String key, WritePolicy writePolicy, + HashMap<String, ByteIterator> values) { + if (writeOverloadTries == 0) { + return RESULT_ERROR; + } + + Bin[] bins = new Bin[values.size()]; + int index = 0; + + for (Map.Entry<String, ByteIterator> entry: values.entrySet()) { + bins[index] = new Bin(entry.getKey(), entry.getValue().toArray()); + ++index; + } + + int delay = WRITE_OVERLOAD_DELAY; + Key keyObj = new Key(namespace, table, key); + + while (true) { + try { + client.put(writePolicy, keyObj, bins); + writeOverloadTries = WRITE_OVERLOAD_TRIES; + return RESULT_OK; + } catch (AerospikeException e) { + if (e.getResultCode() != ResultCode.DEVICE_OVERLOAD) { + System.err.println("Error while updating key " + key + ": " + e); + return RESULT_ERROR; + } + + if (--writeOverloadTries == 0) { + if (DEBUG) { + System.err.println("Device overload: " + e); + } + + return RESULT_ERROR; + } + + try { + Thread.sleep(delay); + } catch (InterruptedException e2) { + if (DEBUG) { + System.err.println("Interrupted: " + e2); + } + } + + delay *= 2; + } + } + } + + @Override + public int update(String table, String key, + HashMap<String, ByteIterator> values) { + return write(table, key, updatePolicy, values); + } + + @Override + public int insert(String table, String key, + HashMap<String, ByteIterator> values) { + return write(table, key, insertPolicy, values); + } + + @Override + public int delete(String table, String key) { + try { + if (!client.delete(deletePolicy, new Key(namespace, table, key))) { + if (DEBUG) { + System.err.println("Record key " + key + " not found (delete)"); + } + + return RESULT_ERROR; + } + + return RESULT_OK; + } catch (AerospikeException e) { + System.err.println("Error while deleting key " + key + ": " + e); + return RESULT_ERROR; + } + } +} diff --git a/bin/ycsb b/bin/ycsb index d28c3f8040949b2ee69bb87f0c2c6d32c16e6c40..2a71e6e2b29e0edb97c690fbfd7fafdb66d86959 100755 --- a/bin/ycsb +++ b/bin/ycsb @@ -28,6 +28,7 @@ COMMANDS = { DATABASES = { "accumulo" : "com.yahoo.ycsb.db.AccumuloClient", + "aerospike" : "com.yahoo.ycsb.db.AerospikeClient", "basic" : "com.yahoo.ycsb.BasicDB", "cassandra-7" : "com.yahoo.ycsb.db.CassandraClient7", "cassandra-8" : "com.yahoo.ycsb.db.CassandraClient8", diff --git a/distribution/pom.xml b/distribution/pom.xml index 9eb0827827f622fcdc55a1ed0e1db0542e31319f..09ceb97797769ad1607747c1ba93cf452c5a3ad5 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -27,6 +27,11 @@ <artifactId>accumulo-binding</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>com.yahoo.ycsb</groupId> + <artifactId>aerospike-binding</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>com.yahoo.ycsb</groupId> <artifactId>cassandra-binding</artifactId> diff --git a/pom.xml b/pom.xml index b88e963e26eb71246ccb1f34fed27bea52b0cb81..50979d3fd59897bb0052a50778ece1068e081de7 100644 --- a/pom.xml +++ b/pom.xml @@ -70,6 +70,7 @@ <hypertable.version>0.9.5.6</hypertable.version> <couchbase.version>1.1.8</couchbase.version> <tarantool.version>1.6.1</tarantool.version> + <aerospike.version>3.1.2</aerospike.version> </properties> <modules> @@ -78,6 +79,7 @@ <module>binding-parent</module> <!-- all the datastore bindings, lex sorted please --> <module>accumulo</module> + <module>aerospike</module> <module>cassandra</module> <module>couchbase</module> <module>distribution</module>