diff --git a/.travis.yml b/.travis.yml index 06dc80c1b7cd9ecce8717a1cddcab6906b0747be..0726f500878634d24d6a85532c6f29814ad03565 100644 --- a/.travis.yml +++ b/.travis.yml @@ -24,6 +24,11 @@ jdk: - oraclejdk8 - openjdk7 +addons: + hosts: + - myshorthost + hostname: myshorthost + install: mvn install -q -DskipTests=true script: mvn test -q @@ -35,5 +40,5 @@ services: # - riak -# Use the Container based infrastructure. -sudo: false +# Can't use container based infra because of hosts/hostname +sudo: true diff --git a/accumulo1.6/README.md b/accumulo1.6/README.md new file mode 100644 index 0000000000000000000000000000000000000000..b10a719c2841193b4a811e1c4f2068cc4c41ef30 --- /dev/null +++ b/accumulo1.6/README.md @@ -0,0 +1,118 @@ +<!-- +Copyright (c) 2015 YCSB contributors. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); you +may not use this file except in compliance with the License. You +may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +implied. See the License for the specific language governing +permissions and limitations under the License. See accompanying +LICENSE file. +--> + +## Quick Start + +This section describes how to run YCSB on [Accumulo](https://accumulo.apache.org/). + +### 1. Start Accumulo + +See the [Accumulo Documentation](https://accumulo.apache.org/1.6/accumulo_user_manual.html#_installation) +for details on installing and running Accumulo. + +Before running the YCSB test you must create the Accumulo table. Again see the +[Accumulo Documentation](https://accumulo.apache.org/1.6/accumulo_user_manual.html#_basic_administration) +for details. The default table name is `ycsb`. + +### 2. Set Up YCSB + +Git clone YCSB and compile: + + git clone http://github.com/brianfrankcooper/YCSB.git + cd YCSB + mvn -pl com.yahoo.ycsb:aerospike-binding -am clean package + +### 3. Create the Accumulo table + +By default, YCSB uses a table with the name "usertable". Users must create this table before loading +data into Accumulo. For maximum Accumulo performance, the Accumulo table must be pre-split. A simple +Ruby script, based on the HBase README, can generate adequate split-point. 10's of Tablets per +TabletServer is a good starting point. Unless otherwise specified, the following commands should run +on any version of Accumulo. + + $ echo 'num_splits = 20; puts (1..num_splits).map {|i| "user#{1000+i*(9999-1000)/num_splits}"}' | ruby > /tmp/splits.txt + $ accumulo shell -u <user> -p <password> -e "createtable usertable" + $ accumulo shell -u <user> -p <password> -e "addsplits -t usertable -sf /tmp/splits.txt" + $ accumulo shell -u <user> -p <password> -e "config -t usertable -s table.cache.block.enable=true" + +Additionally, there are some other configuration properties which can increase performance. These +can be set on the Accumulo table via the shell after it is created. Setting the table durability +to `flush` relaxes the constraints on data durability during hard power-outages (avoids calls +to fsync). Accumulo defaults table compression to `gzip` which is not particularly fast; `snappy` +is a faster and similarly-efficient option. The mutation queue property controls how many writes +that Accumulo will buffer in memory before performing a flush; this property should be set relative +to the amount of JVM heap the TabletServers are given. + +Please note that the `table.durability` and `tserver.total.mutation.queue.max` properties only +exists for >=Accumulo-1.7. There are no concise replacements for these properties in earlier versions. + + accumulo> config -s table.durability=flush + accumulo> config -s tserver.total.mutation.queue.max=256M + accumulo> config -t usertable -s table.file.compress.type=snappy + +On repeated data loads, the following commands may be helpful to re-set the state of the table quickly. + + accumulo> createtable tmp --copy-splits usertable --copy-config usertable + accumulo> deletetable --force usertable + accumulo> renametable tmp usertable + accumulo> compact --wait -t accumulo.metadata + +### 4. Load Data and Run Tests + +Load the data: + + ./bin/ycsb load accumulo1.6 -s -P workloads/workloada \ + -p accumulo.zooKeepers=localhost \ + -p accumulo.columnFamily=ycsb \ + -p accumulo.instanceName=ycsb \ + -p accumulo.username=user \ + -p accumulo.password=supersecret \ + > outputLoad.txt + +Run the workload test: + + ./bin/ycsb run accumulo1.6 -s -P workloads/workloada \ + -p accumulo.zooKeepers=localhost \ + -p accumulo.columnFamily=ycsb \ + -p accumulo.instanceName=ycsb \ + -p accumulo.username=user \ + -p accumulo.password=supersecret \ + > outputLoad.txt + +## Accumulo Configuration Parameters + +- `accumulo.zooKeepers` + - The Accumulo cluster's [zookeeper servers](https://accumulo.apache.org/1.6/accumulo_user_manual.html#_connecting). + - Should contain a comma separated list of of hostname or hostname:port values. + - No default value. + +- `accumulo.columnFamily` + - The name of the column family to use to store the data within the table. + - No default value. + +- `accumulo.instanceName` + - Name of the Accumulo [instance](https://accumulo.apache.org/1.6/accumulo_user_manual.html#_connecting). + - No default value. + +- `accumulo.username` + - The username to use when connecting to Accumulo. + - No default value. + +- `accumulo.password` + - The password for the user connecting to Accumulo. + - No default value. + diff --git a/accumulo/pom.xml b/accumulo1.6/pom.xml similarity index 94% rename from accumulo/pom.xml rename to accumulo1.6/pom.xml index 39b239b8f9516742074dd660d0c1a51f35e5b7af..3c926656a0924873c7612dca8cf8d4559b6d366d 100644 --- a/accumulo/pom.xml +++ b/accumulo1.6/pom.xml @@ -25,8 +25,8 @@ LICENSE file. <version>0.14.0-SNAPSHOT</version> <relativePath>../binding-parent</relativePath> </parent> - <artifactId>accumulo-binding</artifactId> - <name>Accumulo DB Binding</name> + <artifactId>accumulo1.6-binding</artifactId> + <name>Accumulo 1.6 DB Binding</name> <properties> <!-- This should match up to the one from your Accumulo version --> <hadoop.version>2.2.0</hadoop.version> @@ -37,7 +37,7 @@ LICENSE file. <dependency> <groupId>org.apache.accumulo</groupId> <artifactId>accumulo-core</artifactId> - <version>${accumulo.version}</version> + <version>${accumulo.1.6.version}</version> </dependency> <!-- Needed for hadoop.io.Text :( --> <dependency> @@ -66,7 +66,7 @@ LICENSE file. <dependency> <groupId>org.apache.accumulo</groupId> <artifactId>accumulo-minicluster</artifactId> - <version>${accumulo.version}</version> + <version>${accumulo.1.6.version}</version> <scope>test</scope> </dependency> <!-- needed directly only in test, but transitive diff --git a/accumulo/src/main/conf/accumulo.properties b/accumulo1.6/src/main/conf/accumulo.properties similarity index 100% rename from accumulo/src/main/conf/accumulo.properties rename to accumulo1.6/src/main/conf/accumulo.properties diff --git a/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/AccumuloClient.java b/accumulo1.6/src/main/java/com/yahoo/ycsb/db/accumulo/AccumuloClient.java similarity index 100% rename from accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/AccumuloClient.java rename to accumulo1.6/src/main/java/com/yahoo/ycsb/db/accumulo/AccumuloClient.java diff --git a/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/package-info.java b/accumulo1.6/src/main/java/com/yahoo/ycsb/db/accumulo/package-info.java similarity index 100% rename from accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/package-info.java rename to accumulo1.6/src/main/java/com/yahoo/ycsb/db/accumulo/package-info.java diff --git a/accumulo/src/test/java/com/yahoo/ycsb/db/accumulo/AccumuloTest.java b/accumulo1.6/src/test/java/com/yahoo/ycsb/db/accumulo/AccumuloTest.java similarity index 100% rename from accumulo/src/test/java/com/yahoo/ycsb/db/accumulo/AccumuloTest.java rename to accumulo1.6/src/test/java/com/yahoo/ycsb/db/accumulo/AccumuloTest.java diff --git a/accumulo/src/test/resources/log4j.properties b/accumulo1.6/src/test/resources/log4j.properties similarity index 100% rename from accumulo/src/test/resources/log4j.properties rename to accumulo1.6/src/test/resources/log4j.properties diff --git a/accumulo/README.md b/accumulo1.7/README.md similarity index 97% rename from accumulo/README.md rename to accumulo1.7/README.md index 38e444cb7c522626d4d12a42a470f62fa83ba6f7..3c6dca51cfaed8e67397ba3caefc9485b3fd1974 100644 --- a/accumulo/README.md +++ b/accumulo1.7/README.md @@ -75,7 +75,7 @@ On repeated data loads, the following commands may be helpful to re-set the stat Load the data: - ./bin/ycsb load accumulo -s -P workloads/workloada \ + ./bin/ycsb load accumulo1.7 -s -P workloads/workloada \ -p accumulo.zooKeepers=localhost \ -p accumulo.columnFamily=ycsb \ -p accumulo.instanceName=ycsb \ @@ -85,7 +85,7 @@ Load the data: Run the workload test: - ./bin/ycsb run accumulo -s -P workloads/workloada \ + ./bin/ycsb run accumulo1.7 -s -P workloads/workloada \ -p accumulo.zooKeepers=localhost \ -p accumulo.columnFamily=ycsb \ -p accumulo.instanceName=ycsb \ diff --git a/accumulo1.7/pom.xml b/accumulo1.7/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..cc2967cee6ba10bab2bc3f0f7451f7b34391cfd2 --- /dev/null +++ b/accumulo1.7/pom.xml @@ -0,0 +1,91 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Copyright (c) 2011 YCSB++ project, 2014 - 2016 YCSB contributors. +All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); you +may not use this file except in compliance with the License. You +may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +implied. See the License for the specific language governing +permissions and limitations under the License. See accompanying +LICENSE file. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>com.yahoo.ycsb</groupId> + <artifactId>binding-parent</artifactId> + <version>0.14.0-SNAPSHOT</version> + <relativePath>../binding-parent</relativePath> + </parent> + <artifactId>accumulo1.7-binding</artifactId> + <name>Accumulo 1.7 DB Binding</name> + <properties> + <!-- This should match up to the one from your Accumulo version --> + <hadoop.version>2.2.0</hadoop.version> + <!-- Tests do not run on jdk9 --> + <skipJDK9Tests>true</skipJDK9Tests> + </properties> + <dependencies> + <dependency> + <groupId>org.apache.accumulo</groupId> + <artifactId>accumulo-core</artifactId> + <version>${accumulo.1.7.version}</version> + </dependency> + <!-- Needed for hadoop.io.Text :( --> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + <exclusions> + <exclusion> + <groupId>jdk.tools</groupId> + <artifactId>jdk.tools</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>com.yahoo.ycsb</groupId> + <artifactId>core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.12</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.accumulo</groupId> + <artifactId>accumulo-minicluster</artifactId> + <version>${accumulo.1.7.version}</version> + <scope>test</scope> + </dependency> + <!-- needed directly only in test, but transitive + at runtime for accumulo, hadoop, and thrift. --> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>1.7.13</version> + </dependency> + </dependencies> + <build> + <testResources> + <testResource> + <directory>../workloads</directory> + <targetPath>workloads</targetPath> + </testResource> + <testResource> + <directory>src/test/resources</directory> + </testResource> + </testResources> + </build> +</project> diff --git a/accumulo1.7/src/main/conf/accumulo.properties b/accumulo1.7/src/main/conf/accumulo.properties new file mode 100644 index 0000000000000000000000000000000000000000..191ad416d25b2b14531925c1b76a5ba4cbf6870a --- /dev/null +++ b/accumulo1.7/src/main/conf/accumulo.properties @@ -0,0 +1,44 @@ +# Copyright 2014 Cloudera, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you +# may not use this file except in compliance with the License. You +# may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. See the License for the specific language governing +# permissions and limitations under the License. See accompanying +# LICENSE file. +# +# Sample Accumulo configuration properties +# +# You may either set properties here or via the command line. +# + +# This will influence the keys we write +accumulo.columnFamily=YCSB + +# This should be set based on your Accumulo cluster +#accumulo.instanceName=ExampleInstance + +# Comma separated list of host:port tuples for the ZooKeeper quorum used +# by your Accumulo cluster +#accumulo.zooKeepers=zoo1.example.com:2181,zoo2.example.com:2181,zoo3.example.com:2181 + +# This user will need permissions on the table YCSB works against +#accumulo.username=ycsb +#accumulo.password=protectyaneck + +# Controls how long our client writer will wait to buffer more data +# measured in milliseconds +accumulo.batchWriterMaxLatency=30000 + +# Controls how much data our client will attempt to buffer before sending +# measured in bytes +accumulo.batchWriterSize=100000 + +# Controls how many worker threads our client will use to parallelize writes +accumulo.batchWriterThreads=1 diff --git a/accumulo1.7/src/main/java/com/yahoo/ycsb/db/accumulo/AccumuloClient.java b/accumulo1.7/src/main/java/com/yahoo/ycsb/db/accumulo/AccumuloClient.java new file mode 100644 index 0000000000000000000000000000000000000000..e260b9afa48d69352bd49f2a54b650591c1bd19e --- /dev/null +++ b/accumulo1.7/src/main/java/com/yahoo/ycsb/db/accumulo/AccumuloClient.java @@ -0,0 +1,372 @@ +/** + * Copyright (c) 2011 YCSB++ project, 2014-2016 YCSB contributors. + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +package com.yahoo.ycsb.db.accumulo; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.SortedMap; +import java.util.Vector; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.user.WholeRowIterator; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.CleanUp; +import org.apache.hadoop.io.Text; + +import com.yahoo.ycsb.ByteArrayByteIterator; +import com.yahoo.ycsb.ByteIterator; +import com.yahoo.ycsb.DB; +import com.yahoo.ycsb.DBException; +import com.yahoo.ycsb.Status; + +/** + * <a href="https://accumulo.apache.org/">Accumulo</a> binding for YCSB. + */ +public class AccumuloClient extends DB { + + private ZooKeeperInstance inst; + private Connector connector; + private Text colFam = new Text(""); + private byte[] colFamBytes = new byte[0]; + private final ConcurrentHashMap<String, BatchWriter> writers = new ConcurrentHashMap<>(); + + static { + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + CleanUp.shutdownNow(); + } + }); + } + + @Override + public void init() throws DBException { + colFam = new Text(getProperties().getProperty("accumulo.columnFamily")); + colFamBytes = colFam.toString().getBytes(UTF_8); + + inst = new ZooKeeperInstance(new ClientConfiguration() + .withInstance(getProperties().getProperty("accumulo.instanceName")) + .withZkHosts(getProperties().getProperty("accumulo.zooKeepers"))); + try { + String principal = getProperties().getProperty("accumulo.username"); + AuthenticationToken token = + new PasswordToken(getProperties().getProperty("accumulo.password")); + connector = inst.getConnector(principal, token); + } catch (AccumuloException | AccumuloSecurityException e) { + throw new DBException(e); + } + + if (!(getProperties().getProperty("accumulo.pcFlag", "none").equals("none"))) { + System.err.println("Sorry, the ZK based producer/consumer implementation has been removed. " + + "Please see YCSB issue #416 for work on adding a general solution to coordinated work."); + } + } + + @Override + public void cleanup() throws DBException { + try { + Iterator<BatchWriter> iterator = writers.values().iterator(); + while (iterator.hasNext()) { + BatchWriter writer = iterator.next(); + writer.close(); + iterator.remove(); + } + } catch (MutationsRejectedException e) { + throw new DBException(e); + } + } + + /** + * Called when the user specifies a table that isn't the same as the existing + * table. Connect to it and if necessary, close our current connection. + * + * @param table + * The table to open. + */ + public BatchWriter getWriter(String table) throws TableNotFoundException { + // tl;dr We're paying a cost for the ConcurrentHashMap here to deal with the DB api. + // We know that YCSB is really only ever going to send us data for one table, so using + // a concurrent data structure is overkill (especially in such a hot code path). + // However, the impact seems to be relatively negligible in trivial local tests and it's + // "more correct" WRT to the API. + BatchWriter writer = writers.get(table); + if (null == writer) { + BatchWriter newWriter = createBatchWriter(table); + BatchWriter oldWriter = writers.putIfAbsent(table, newWriter); + // Someone beat us to creating a BatchWriter for this table, use their BatchWriters + if (null != oldWriter) { + try { + // Make sure to clean up our new batchwriter! + newWriter.close(); + } catch (MutationsRejectedException e) { + throw new RuntimeException(e); + } + writer = oldWriter; + } else { + writer = newWriter; + } + } + return writer; + } + + /** + * Creates a BatchWriter with the expected configuration. + * + * @param table The table to write to + */ + private BatchWriter createBatchWriter(String table) throws TableNotFoundException { + BatchWriterConfig bwc = new BatchWriterConfig(); + bwc.setMaxLatency( + Long.parseLong(getProperties() + .getProperty("accumulo.batchWriterMaxLatency", "30000")), + TimeUnit.MILLISECONDS); + bwc.setMaxMemory(Long.parseLong( + getProperties().getProperty("accumulo.batchWriterSize", "100000"))); + final String numThreadsValue = getProperties().getProperty("accumulo.batchWriterThreads"); + // Try to saturate the client machine. + int numThreads = Math.max(1, Runtime.getRuntime().availableProcessors() / 2); + if (null != numThreadsValue) { + numThreads = Integer.parseInt(numThreadsValue); + } + System.err.println("Using " + numThreads + " threads to write data"); + bwc.setMaxWriteThreads(numThreads); + return connector.createBatchWriter(table, bwc); + } + + /** + * Gets a scanner from Accumulo over one row. + * + * @param row the row to scan + * @param fields the set of columns to scan + * @return an Accumulo {@link Scanner} bound to the given row and columns + */ + private Scanner getRow(String table, Text row, Set<String> fields) throws TableNotFoundException { + Scanner scanner = connector.createScanner(table, Authorizations.EMPTY); + scanner.setRange(new Range(row)); + if (fields != null) { + for (String field : fields) { + scanner.fetchColumn(colFam, new Text(field)); + } + } + return scanner; + } + + @Override + public Status read(String table, String key, Set<String> fields, + Map<String, ByteIterator> result) { + + Scanner scanner = null; + try { + scanner = getRow(table, new Text(key), null); + // Pick out the results we care about. + final Text cq = new Text(); + for (Entry<Key, Value> entry : scanner) { + entry.getKey().getColumnQualifier(cq); + Value v = entry.getValue(); + byte[] buf = v.get(); + result.put(cq.toString(), + new ByteArrayByteIterator(buf)); + } + } catch (Exception e) { + System.err.println("Error trying to reading Accumulo table " + table + " " + key); + e.printStackTrace(); + return Status.ERROR; + } finally { + if (null != scanner) { + scanner.close(); + } + } + return Status.OK; + + } + + @Override + public Status scan(String table, String startkey, int recordcount, + Set<String> fields, Vector<HashMap<String, ByteIterator>> result) { + // Just make the end 'infinity' and only read as much as we need. + Scanner scanner = null; + try { + scanner = connector.createScanner(table, Authorizations.EMPTY); + scanner.setRange(new Range(new Text(startkey), null)); + + // Have Accumulo send us complete rows, serialized in a single Key-Value pair + IteratorSetting cfg = new IteratorSetting(100, WholeRowIterator.class); + scanner.addScanIterator(cfg); + + // If no fields are provided, we assume one column/row. + if (fields != null) { + // And add each of them as fields we want. + for (String field : fields) { + scanner.fetchColumn(colFam, new Text(field)); + } + } + + int count = 0; + for (Entry<Key, Value> entry : scanner) { + // Deserialize the row + SortedMap<Key, Value> row = WholeRowIterator.decodeRow(entry.getKey(), entry.getValue()); + HashMap<String, ByteIterator> rowData; + if (null != fields) { + rowData = new HashMap<>(fields.size()); + } else { + rowData = new HashMap<>(); + } + result.add(rowData); + // Parse the data in the row, avoid unnecessary Text object creation + final Text cq = new Text(); + for (Entry<Key, Value> rowEntry : row.entrySet()) { + rowEntry.getKey().getColumnQualifier(cq); + rowData.put(cq.toString(), new ByteArrayByteIterator(rowEntry.getValue().get())); + } + if (count++ == recordcount) { // Done reading the last row. + break; + } + } + } catch (TableNotFoundException e) { + System.err.println("Error trying to connect to Accumulo table."); + e.printStackTrace(); + return Status.ERROR; + } catch (IOException e) { + System.err.println("Error deserializing data from Accumulo."); + e.printStackTrace(); + return Status.ERROR; + } finally { + if (null != scanner) { + scanner.close(); + } + } + + return Status.OK; + } + + @Override + public Status update(String table, String key, + Map<String, ByteIterator> values) { + BatchWriter bw = null; + try { + bw = getWriter(table); + } catch (TableNotFoundException e) { + System.err.println("Error opening batch writer to Accumulo table " + table); + e.printStackTrace(); + return Status.ERROR; + } + + Mutation mutInsert = new Mutation(key.getBytes(UTF_8)); + for (Map.Entry<String, ByteIterator> entry : values.entrySet()) { + mutInsert.put(colFamBytes, entry.getKey().getBytes(UTF_8), entry.getValue().toArray()); + } + + try { + bw.addMutation(mutInsert); + } catch (MutationsRejectedException e) { + System.err.println("Error performing update."); + e.printStackTrace(); + return Status.ERROR; + } + + return Status.BATCHED_OK; + } + + @Override + public Status insert(String t, String key, + Map<String, ByteIterator> values) { + return update(t, key, values); + } + + @Override + public Status delete(String table, String key) { + BatchWriter bw; + try { + bw = getWriter(table); + } catch (TableNotFoundException e) { + System.err.println("Error trying to connect to Accumulo table."); + e.printStackTrace(); + return Status.ERROR; + } + + try { + deleteRow(table, new Text(key), bw); + } catch (TableNotFoundException | MutationsRejectedException e) { + System.err.println("Error performing delete."); + e.printStackTrace(); + return Status.ERROR; + } catch (RuntimeException e) { + System.err.println("Error performing delete."); + e.printStackTrace(); + return Status.ERROR; + } + + return Status.OK; + } + + // These functions are adapted from RowOperations.java: + private void deleteRow(String table, Text row, BatchWriter bw) throws MutationsRejectedException, + TableNotFoundException { + // TODO Use a batchDeleter instead + deleteRow(getRow(table, row, null), bw); + } + + /** + * Deletes a row, given a Scanner of JUST that row. + */ + private void deleteRow(Scanner scanner, BatchWriter bw) throws MutationsRejectedException { + Mutation deleter = null; + // iterate through the keys + final Text row = new Text(); + final Text cf = new Text(); + final Text cq = new Text(); + for (Entry<Key, Value> entry : scanner) { + // create a mutation for the row + if (deleter == null) { + entry.getKey().getRow(row); + deleter = new Mutation(row); + } + entry.getKey().getColumnFamily(cf); + entry.getKey().getColumnQualifier(cq); + // the remove function adds the key with the delete flag set to true + deleter.putDelete(cf, cq); + } + + bw.addMutation(deleter); + } +} diff --git a/accumulo1.7/src/main/java/com/yahoo/ycsb/db/accumulo/package-info.java b/accumulo1.7/src/main/java/com/yahoo/ycsb/db/accumulo/package-info.java new file mode 100644 index 0000000000000000000000000000000000000000..e38d200c774c03138f6ba8f642b2d2fe9bebc578 --- /dev/null +++ b/accumulo1.7/src/main/java/com/yahoo/ycsb/db/accumulo/package-info.java @@ -0,0 +1,22 @@ +/** + * Copyright (c) 2015 YCSB contributors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +/** + * YCSB binding for <a href="https://accumulo.apache.org/">Apache Accumulo</a>. + */ +package com.yahoo.ycsb.db.accumulo; + diff --git a/accumulo1.7/src/test/java/com/yahoo/ycsb/db/accumulo/AccumuloTest.java b/accumulo1.7/src/test/java/com/yahoo/ycsb/db/accumulo/AccumuloTest.java new file mode 100644 index 0000000000000000000000000000000000000000..ce0b160f785ff94c6301d68a99b1e644289d7233 --- /dev/null +++ b/accumulo1.7/src/test/java/com/yahoo/ycsb/db/accumulo/AccumuloTest.java @@ -0,0 +1,218 @@ +/* + * Copyright (c) 2016 YCSB contributors. + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +package com.yahoo.ycsb.db.accumulo; + + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeTrue; + +import java.util.Map.Entry; +import java.util.Properties; + +import com.yahoo.ycsb.Workload; +import com.yahoo.ycsb.DB; +import com.yahoo.ycsb.measurements.Measurements; +import com.yahoo.ycsb.workloads.CoreWorkload; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.minicluster.MiniAccumuloCluster; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Use an Accumulo MiniCluster to test out basic workload operations with + * the Accumulo binding. + */ +public class AccumuloTest { + private static final Logger LOG = LoggerFactory.getLogger(AccumuloTest.class); + private static final int INSERT_COUNT = 2000; + private static final int TRANSACTION_COUNT = 2000; + + @ClassRule + public static TemporaryFolder workingDir = new TemporaryFolder(); + @Rule + public TestName test = new TestName(); + + private static MiniAccumuloCluster cluster; + private static Properties properties; + private Workload workload; + private DB client; + private Properties workloadProps; + + private static boolean isWindows() { + final String os = System.getProperty("os.name"); + return os.startsWith("Windows"); + } + + @BeforeClass + public static void setup() throws Exception { + // Minicluster setup fails on Windows with an UnsatisfiedLinkError. + // Skip if windows. + assumeTrue(!isWindows()); + cluster = new MiniAccumuloCluster(workingDir.newFolder("accumulo").getAbsoluteFile(), "protectyaneck"); + LOG.debug("starting minicluster"); + cluster.start(); + LOG.debug("creating connection for admin operations."); + // set up the table and user + final Connector admin = cluster.getConnector("root", "protectyaneck"); + admin.tableOperations().create(CoreWorkload.TABLENAME_PROPERTY_DEFAULT); + admin.securityOperations().createLocalUser("ycsb", new PasswordToken("protectyaneck")); + admin.securityOperations().grantTablePermission("ycsb", CoreWorkload.TABLENAME_PROPERTY_DEFAULT, TablePermission.READ); + admin.securityOperations().grantTablePermission("ycsb", CoreWorkload.TABLENAME_PROPERTY_DEFAULT, TablePermission.WRITE); + + // set properties the binding will read + properties = new Properties(); + properties.setProperty("accumulo.zooKeepers", cluster.getZooKeepers()); + properties.setProperty("accumulo.instanceName", cluster.getInstanceName()); + properties.setProperty("accumulo.columnFamily", "family"); + properties.setProperty("accumulo.username", "ycsb"); + properties.setProperty("accumulo.password", "protectyaneck"); + // cut down the batch writer timeout so that writes will push through. + properties.setProperty("accumulo.batchWriterMaxLatency", "4"); + // set these explicitly to the defaults at the time we're compiled, since they'll be inlined in our class. + properties.setProperty(CoreWorkload.TABLENAME_PROPERTY, CoreWorkload.TABLENAME_PROPERTY_DEFAULT); + properties.setProperty(CoreWorkload.FIELD_COUNT_PROPERTY, CoreWorkload.FIELD_COUNT_PROPERTY_DEFAULT); + properties.setProperty(CoreWorkload.INSERT_ORDER_PROPERTY, "ordered"); + } + + @AfterClass + public static void clusterCleanup() throws Exception { + if (cluster != null) { + LOG.debug("shutting down minicluster"); + cluster.stop(); + cluster = null; + } + } + + @Before + public void client() throws Exception { + + LOG.debug("Loading workload properties for {}", test.getMethodName()); + workloadProps = new Properties(); + workloadProps.load(getClass().getResourceAsStream("/workloads/" + test.getMethodName())); + + for (String prop : properties.stringPropertyNames()) { + workloadProps.setProperty(prop, properties.getProperty(prop)); + } + + // TODO we need a better test rig for 'run this ycsb workload' + LOG.debug("initializing measurements and workload"); + Measurements.setProperties(workloadProps); + workload = new CoreWorkload(); + workload.init(workloadProps); + + LOG.debug("initializing client"); + client = new AccumuloClient(); + client.setProperties(workloadProps); + client.init(); + } + + @After + public void cleanup() throws Exception { + if (client != null) { + LOG.debug("cleaning up client"); + client.cleanup(); + client = null; + } + if (workload != null) { + LOG.debug("cleaning up workload"); + workload.cleanup(); + } + } + + @After + public void truncateTable() throws Exception { + if (cluster != null) { + LOG.debug("truncating table {}", CoreWorkload.TABLENAME_PROPERTY_DEFAULT); + final Connector admin = cluster.getConnector("root", "protectyaneck"); + admin.tableOperations().deleteRows(CoreWorkload.TABLENAME_PROPERTY_DEFAULT, null, null); + } + } + + @Test + public void workloada() throws Exception { + runWorkload(); + } + + @Test + public void workloadb() throws Exception { + runWorkload(); + } + + @Test + public void workloadc() throws Exception { + runWorkload(); + } + + @Test + public void workloadd() throws Exception { + runWorkload(); + } + + @Test + public void workloade() throws Exception { + runWorkload(); + } + + /** + * go through a workload cycle. + * <ol> + * <li>initialize thread-specific state + * <li>load the workload dataset + * <li>run workload transactions + * </ol> + */ + private void runWorkload() throws Exception { + final Object state = workload.initThread(workloadProps,0,0); + LOG.debug("load"); + for (int i = 0; i < INSERT_COUNT; i++) { + assertTrue("insert failed.", workload.doInsert(client, state)); + } + // Ensure we wait long enough for the batch writer to flush + // TODO accumulo client should be flushing per insert by default. + Thread.sleep(2000); + LOG.debug("verify number of cells"); + final Scanner scanner = cluster.getConnector("root", "protectyaneck").createScanner(CoreWorkload.TABLENAME_PROPERTY_DEFAULT, Authorizations.EMPTY); + int count = 0; + for (Entry<Key, Value> entry : scanner) { + count++; + } + assertEquals("Didn't get enough total cells.", (Integer.valueOf(CoreWorkload.FIELD_COUNT_PROPERTY_DEFAULT) * INSERT_COUNT), count); + LOG.debug("run"); + for (int i = 0; i < TRANSACTION_COUNT; i++) { + assertTrue("transaction failed.", workload.doTransaction(client, state)); + } + } +} diff --git a/accumulo1.7/src/test/resources/log4j.properties b/accumulo1.7/src/test/resources/log4j.properties new file mode 100644 index 0000000000000000000000000000000000000000..2d48dce5a34914b9470a4948e4955b342bc03cbc --- /dev/null +++ b/accumulo1.7/src/test/resources/log4j.properties @@ -0,0 +1,29 @@ +# +# Copyright (c) 2015 YCSB contributors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you +# may not use this file except in compliance with the License. You +# may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. See the License for the specific language governing +# permissions and limitations under the License. See accompanying +# LICENSE file. +# + +# Root logger option +log4j.rootLogger=INFO, stderr + +log4j.appender.stderr=org.apache.log4j.ConsoleAppender +log4j.appender.stderr.target=System.err +log4j.appender.stderr.layout=org.apache.log4j.PatternLayout +log4j.appender.stderr.layout.conversionPattern=%d{yyyy/MM/dd HH:mm:ss} %-5p %c %x - %m%n + +# Suppress messages from ZooKeeper +log4j.logger.com.yahoo.ycsb.db.accumulo=DEBUG +log4j.logger.org.apache.zookeeper=ERROR +log4j.logger.org.apache.accumulo=WARN diff --git a/accumulo1.8/README.md b/accumulo1.8/README.md new file mode 100644 index 0000000000000000000000000000000000000000..0c4039392d49a9eef07ee2d60d6ebb1b8090e459 --- /dev/null +++ b/accumulo1.8/README.md @@ -0,0 +1,118 @@ +<!-- +Copyright (c) 2015 YCSB contributors. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); you +may not use this file except in compliance with the License. You +may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +implied. See the License for the specific language governing +permissions and limitations under the License. See accompanying +LICENSE file. +--> + +## Quick Start + +This section describes how to run YCSB on [Accumulo](https://accumulo.apache.org/). + +### 1. Start Accumulo + +See the [Accumulo Documentation](https://accumulo.apache.org/1.8/accumulo_user_manual.html#_installation) +for details on installing and running Accumulo. + +Before running the YCSB test you must create the Accumulo table. Again see the +[Accumulo Documentation](https://accumulo.apache.org/1.8/accumulo_user_manual.html#_basic_administration) +for details. The default table name is `ycsb`. + +### 2. Set Up YCSB + +Git clone YCSB and compile: + + git clone http://github.com/brianfrankcooper/YCSB.git + cd YCSB + mvn -pl com.yahoo.ycsb:aerospike-binding -am clean package + +### 3. Create the Accumulo table + +By default, YCSB uses a table with the name "usertable". Users must create this table before loading +data into Accumulo. For maximum Accumulo performance, the Accumulo table must be pre-split. A simple +Ruby script, based on the HBase README, can generate adequate split-point. 10's of Tablets per +TabletServer is a good starting point. Unless otherwise specified, the following commands should run +on any version of Accumulo. + + $ echo 'num_splits = 20; puts (1..num_splits).map {|i| "user#{1000+i*(9999-1000)/num_splits}"}' | ruby > /tmp/splits.txt + $ accumulo shell -u <user> -p <password> -e "createtable usertable" + $ accumulo shell -u <user> -p <password> -e "addsplits -t usertable -sf /tmp/splits.txt" + $ accumulo shell -u <user> -p <password> -e "config -t usertable -s table.cache.block.enable=true" + +Additionally, there are some other configuration properties which can increase performance. These +can be set on the Accumulo table via the shell after it is created. Setting the table durability +to `flush` relaxes the constraints on data durability during hard power-outages (avoids calls +to fsync). Accumulo defaults table compression to `gzip` which is not particularly fast; `snappy` +is a faster and similarly-efficient option. The mutation queue property controls how many writes +that Accumulo will buffer in memory before performing a flush; this property should be set relative +to the amount of JVM heap the TabletServers are given. + +Please note that the `table.durability` and `tserver.total.mutation.queue.max` properties only +exists for >=Accumulo-1.7. There are no concise replacements for these properties in earlier versions. + + accumulo> config -s table.durability=flush + accumulo> config -s tserver.total.mutation.queue.max=256M + accumulo> config -t usertable -s table.file.compress.type=snappy + +On repeated data loads, the following commands may be helpful to re-set the state of the table quickly. + + accumulo> createtable tmp --copy-splits usertable --copy-config usertable + accumulo> deletetable --force usertable + accumulo> renametable tmp usertable + accumulo> compact --wait -t accumulo.metadata + +### 4. Load Data and Run Tests + +Load the data: + + ./bin/ycsb load accumulo1.8 -s -P workloads/workloada \ + -p accumulo.zooKeepers=localhost \ + -p accumulo.columnFamily=ycsb \ + -p accumulo.instanceName=ycsb \ + -p accumulo.username=user \ + -p accumulo.password=supersecret \ + > outputLoad.txt + +Run the workload test: + + ./bin/ycsb run accumulo1.8 -s -P workloads/workloada \ + -p accumulo.zooKeepers=localhost \ + -p accumulo.columnFamily=ycsb \ + -p accumulo.instanceName=ycsb \ + -p accumulo.username=user \ + -p accumulo.password=supersecret \ + > outputLoad.txt + +## Accumulo Configuration Parameters + +- `accumulo.zooKeepers` + - The Accumulo cluster's [zookeeper servers](https://accumulo.apache.org/1.8/accumulo_user_manual.html#_connecting). + - Should contain a comma separated list of of hostname or hostname:port values. + - No default value. + +- `accumulo.columnFamily` + - The name of the column family to use to store the data within the table. + - No default value. + +- `accumulo.instanceName` + - Name of the Accumulo [instance](https://accumulo.apache.org/1.8/accumulo_user_manual.html#_connecting). + - No default value. + +- `accumulo.username` + - The username to use when connecting to Accumulo. + - No default value. + +- `accumulo.password` + - The password for the user connecting to Accumulo. + - No default value. + diff --git a/accumulo1.8/pom.xml b/accumulo1.8/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..9ad74b036409bf858e63894b469176f2c4be9d8a --- /dev/null +++ b/accumulo1.8/pom.xml @@ -0,0 +1,91 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Copyright (c) 2011 YCSB++ project, 2014 - 2016 YCSB contributors. +All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); you +may not use this file except in compliance with the License. You +may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +implied. See the License for the specific language governing +permissions and limitations under the License. See accompanying +LICENSE file. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>com.yahoo.ycsb</groupId> + <artifactId>binding-parent</artifactId> + <version>0.14.0-SNAPSHOT</version> + <relativePath>../binding-parent</relativePath> + </parent> + <artifactId>accumulo1.8-binding</artifactId> + <name>Accumulo 1.8 DB Binding</name> + <properties> + <!-- This should match up to the one from your Accumulo version --> + <hadoop.version>2.6.4</hadoop.version> + <!-- Tests do not run on jdk9 --> + <skipJDK9Tests>true</skipJDK9Tests> + </properties> + <dependencies> + <dependency> + <groupId>org.apache.accumulo</groupId> + <artifactId>accumulo-core</artifactId> + <version>${accumulo.1.8.version}</version> + </dependency> + <!-- Needed for hadoop.io.Text :( --> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + <exclusions> + <exclusion> + <groupId>jdk.tools</groupId> + <artifactId>jdk.tools</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>com.yahoo.ycsb</groupId> + <artifactId>core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.12</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.accumulo</groupId> + <artifactId>accumulo-minicluster</artifactId> + <version>${accumulo.1.8.version}</version> + <scope>test</scope> + </dependency> + <!-- needed directly only in test, but transitive + at runtime for accumulo, hadoop, and thrift. --> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>1.7.13</version> + </dependency> + </dependencies> + <build> + <testResources> + <testResource> + <directory>../workloads</directory> + <targetPath>workloads</targetPath> + </testResource> + <testResource> + <directory>src/test/resources</directory> + </testResource> + </testResources> + </build> +</project> diff --git a/accumulo1.8/src/main/conf/accumulo.properties b/accumulo1.8/src/main/conf/accumulo.properties new file mode 100644 index 0000000000000000000000000000000000000000..191ad416d25b2b14531925c1b76a5ba4cbf6870a --- /dev/null +++ b/accumulo1.8/src/main/conf/accumulo.properties @@ -0,0 +1,44 @@ +# Copyright 2014 Cloudera, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you +# may not use this file except in compliance with the License. You +# may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. See the License for the specific language governing +# permissions and limitations under the License. See accompanying +# LICENSE file. +# +# Sample Accumulo configuration properties +# +# You may either set properties here or via the command line. +# + +# This will influence the keys we write +accumulo.columnFamily=YCSB + +# This should be set based on your Accumulo cluster +#accumulo.instanceName=ExampleInstance + +# Comma separated list of host:port tuples for the ZooKeeper quorum used +# by your Accumulo cluster +#accumulo.zooKeepers=zoo1.example.com:2181,zoo2.example.com:2181,zoo3.example.com:2181 + +# This user will need permissions on the table YCSB works against +#accumulo.username=ycsb +#accumulo.password=protectyaneck + +# Controls how long our client writer will wait to buffer more data +# measured in milliseconds +accumulo.batchWriterMaxLatency=30000 + +# Controls how much data our client will attempt to buffer before sending +# measured in bytes +accumulo.batchWriterSize=100000 + +# Controls how many worker threads our client will use to parallelize writes +accumulo.batchWriterThreads=1 diff --git a/accumulo1.8/src/main/java/com/yahoo/ycsb/db/accumulo/AccumuloClient.java b/accumulo1.8/src/main/java/com/yahoo/ycsb/db/accumulo/AccumuloClient.java new file mode 100644 index 0000000000000000000000000000000000000000..e260b9afa48d69352bd49f2a54b650591c1bd19e --- /dev/null +++ b/accumulo1.8/src/main/java/com/yahoo/ycsb/db/accumulo/AccumuloClient.java @@ -0,0 +1,372 @@ +/** + * Copyright (c) 2011 YCSB++ project, 2014-2016 YCSB contributors. + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +package com.yahoo.ycsb.db.accumulo; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.SortedMap; +import java.util.Vector; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.user.WholeRowIterator; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.CleanUp; +import org.apache.hadoop.io.Text; + +import com.yahoo.ycsb.ByteArrayByteIterator; +import com.yahoo.ycsb.ByteIterator; +import com.yahoo.ycsb.DB; +import com.yahoo.ycsb.DBException; +import com.yahoo.ycsb.Status; + +/** + * <a href="https://accumulo.apache.org/">Accumulo</a> binding for YCSB. + */ +public class AccumuloClient extends DB { + + private ZooKeeperInstance inst; + private Connector connector; + private Text colFam = new Text(""); + private byte[] colFamBytes = new byte[0]; + private final ConcurrentHashMap<String, BatchWriter> writers = new ConcurrentHashMap<>(); + + static { + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + CleanUp.shutdownNow(); + } + }); + } + + @Override + public void init() throws DBException { + colFam = new Text(getProperties().getProperty("accumulo.columnFamily")); + colFamBytes = colFam.toString().getBytes(UTF_8); + + inst = new ZooKeeperInstance(new ClientConfiguration() + .withInstance(getProperties().getProperty("accumulo.instanceName")) + .withZkHosts(getProperties().getProperty("accumulo.zooKeepers"))); + try { + String principal = getProperties().getProperty("accumulo.username"); + AuthenticationToken token = + new PasswordToken(getProperties().getProperty("accumulo.password")); + connector = inst.getConnector(principal, token); + } catch (AccumuloException | AccumuloSecurityException e) { + throw new DBException(e); + } + + if (!(getProperties().getProperty("accumulo.pcFlag", "none").equals("none"))) { + System.err.println("Sorry, the ZK based producer/consumer implementation has been removed. " + + "Please see YCSB issue #416 for work on adding a general solution to coordinated work."); + } + } + + @Override + public void cleanup() throws DBException { + try { + Iterator<BatchWriter> iterator = writers.values().iterator(); + while (iterator.hasNext()) { + BatchWriter writer = iterator.next(); + writer.close(); + iterator.remove(); + } + } catch (MutationsRejectedException e) { + throw new DBException(e); + } + } + + /** + * Called when the user specifies a table that isn't the same as the existing + * table. Connect to it and if necessary, close our current connection. + * + * @param table + * The table to open. + */ + public BatchWriter getWriter(String table) throws TableNotFoundException { + // tl;dr We're paying a cost for the ConcurrentHashMap here to deal with the DB api. + // We know that YCSB is really only ever going to send us data for one table, so using + // a concurrent data structure is overkill (especially in such a hot code path). + // However, the impact seems to be relatively negligible in trivial local tests and it's + // "more correct" WRT to the API. + BatchWriter writer = writers.get(table); + if (null == writer) { + BatchWriter newWriter = createBatchWriter(table); + BatchWriter oldWriter = writers.putIfAbsent(table, newWriter); + // Someone beat us to creating a BatchWriter for this table, use their BatchWriters + if (null != oldWriter) { + try { + // Make sure to clean up our new batchwriter! + newWriter.close(); + } catch (MutationsRejectedException e) { + throw new RuntimeException(e); + } + writer = oldWriter; + } else { + writer = newWriter; + } + } + return writer; + } + + /** + * Creates a BatchWriter with the expected configuration. + * + * @param table The table to write to + */ + private BatchWriter createBatchWriter(String table) throws TableNotFoundException { + BatchWriterConfig bwc = new BatchWriterConfig(); + bwc.setMaxLatency( + Long.parseLong(getProperties() + .getProperty("accumulo.batchWriterMaxLatency", "30000")), + TimeUnit.MILLISECONDS); + bwc.setMaxMemory(Long.parseLong( + getProperties().getProperty("accumulo.batchWriterSize", "100000"))); + final String numThreadsValue = getProperties().getProperty("accumulo.batchWriterThreads"); + // Try to saturate the client machine. + int numThreads = Math.max(1, Runtime.getRuntime().availableProcessors() / 2); + if (null != numThreadsValue) { + numThreads = Integer.parseInt(numThreadsValue); + } + System.err.println("Using " + numThreads + " threads to write data"); + bwc.setMaxWriteThreads(numThreads); + return connector.createBatchWriter(table, bwc); + } + + /** + * Gets a scanner from Accumulo over one row. + * + * @param row the row to scan + * @param fields the set of columns to scan + * @return an Accumulo {@link Scanner} bound to the given row and columns + */ + private Scanner getRow(String table, Text row, Set<String> fields) throws TableNotFoundException { + Scanner scanner = connector.createScanner(table, Authorizations.EMPTY); + scanner.setRange(new Range(row)); + if (fields != null) { + for (String field : fields) { + scanner.fetchColumn(colFam, new Text(field)); + } + } + return scanner; + } + + @Override + public Status read(String table, String key, Set<String> fields, + Map<String, ByteIterator> result) { + + Scanner scanner = null; + try { + scanner = getRow(table, new Text(key), null); + // Pick out the results we care about. + final Text cq = new Text(); + for (Entry<Key, Value> entry : scanner) { + entry.getKey().getColumnQualifier(cq); + Value v = entry.getValue(); + byte[] buf = v.get(); + result.put(cq.toString(), + new ByteArrayByteIterator(buf)); + } + } catch (Exception e) { + System.err.println("Error trying to reading Accumulo table " + table + " " + key); + e.printStackTrace(); + return Status.ERROR; + } finally { + if (null != scanner) { + scanner.close(); + } + } + return Status.OK; + + } + + @Override + public Status scan(String table, String startkey, int recordcount, + Set<String> fields, Vector<HashMap<String, ByteIterator>> result) { + // Just make the end 'infinity' and only read as much as we need. + Scanner scanner = null; + try { + scanner = connector.createScanner(table, Authorizations.EMPTY); + scanner.setRange(new Range(new Text(startkey), null)); + + // Have Accumulo send us complete rows, serialized in a single Key-Value pair + IteratorSetting cfg = new IteratorSetting(100, WholeRowIterator.class); + scanner.addScanIterator(cfg); + + // If no fields are provided, we assume one column/row. + if (fields != null) { + // And add each of them as fields we want. + for (String field : fields) { + scanner.fetchColumn(colFam, new Text(field)); + } + } + + int count = 0; + for (Entry<Key, Value> entry : scanner) { + // Deserialize the row + SortedMap<Key, Value> row = WholeRowIterator.decodeRow(entry.getKey(), entry.getValue()); + HashMap<String, ByteIterator> rowData; + if (null != fields) { + rowData = new HashMap<>(fields.size()); + } else { + rowData = new HashMap<>(); + } + result.add(rowData); + // Parse the data in the row, avoid unnecessary Text object creation + final Text cq = new Text(); + for (Entry<Key, Value> rowEntry : row.entrySet()) { + rowEntry.getKey().getColumnQualifier(cq); + rowData.put(cq.toString(), new ByteArrayByteIterator(rowEntry.getValue().get())); + } + if (count++ == recordcount) { // Done reading the last row. + break; + } + } + } catch (TableNotFoundException e) { + System.err.println("Error trying to connect to Accumulo table."); + e.printStackTrace(); + return Status.ERROR; + } catch (IOException e) { + System.err.println("Error deserializing data from Accumulo."); + e.printStackTrace(); + return Status.ERROR; + } finally { + if (null != scanner) { + scanner.close(); + } + } + + return Status.OK; + } + + @Override + public Status update(String table, String key, + Map<String, ByteIterator> values) { + BatchWriter bw = null; + try { + bw = getWriter(table); + } catch (TableNotFoundException e) { + System.err.println("Error opening batch writer to Accumulo table " + table); + e.printStackTrace(); + return Status.ERROR; + } + + Mutation mutInsert = new Mutation(key.getBytes(UTF_8)); + for (Map.Entry<String, ByteIterator> entry : values.entrySet()) { + mutInsert.put(colFamBytes, entry.getKey().getBytes(UTF_8), entry.getValue().toArray()); + } + + try { + bw.addMutation(mutInsert); + } catch (MutationsRejectedException e) { + System.err.println("Error performing update."); + e.printStackTrace(); + return Status.ERROR; + } + + return Status.BATCHED_OK; + } + + @Override + public Status insert(String t, String key, + Map<String, ByteIterator> values) { + return update(t, key, values); + } + + @Override + public Status delete(String table, String key) { + BatchWriter bw; + try { + bw = getWriter(table); + } catch (TableNotFoundException e) { + System.err.println("Error trying to connect to Accumulo table."); + e.printStackTrace(); + return Status.ERROR; + } + + try { + deleteRow(table, new Text(key), bw); + } catch (TableNotFoundException | MutationsRejectedException e) { + System.err.println("Error performing delete."); + e.printStackTrace(); + return Status.ERROR; + } catch (RuntimeException e) { + System.err.println("Error performing delete."); + e.printStackTrace(); + return Status.ERROR; + } + + return Status.OK; + } + + // These functions are adapted from RowOperations.java: + private void deleteRow(String table, Text row, BatchWriter bw) throws MutationsRejectedException, + TableNotFoundException { + // TODO Use a batchDeleter instead + deleteRow(getRow(table, row, null), bw); + } + + /** + * Deletes a row, given a Scanner of JUST that row. + */ + private void deleteRow(Scanner scanner, BatchWriter bw) throws MutationsRejectedException { + Mutation deleter = null; + // iterate through the keys + final Text row = new Text(); + final Text cf = new Text(); + final Text cq = new Text(); + for (Entry<Key, Value> entry : scanner) { + // create a mutation for the row + if (deleter == null) { + entry.getKey().getRow(row); + deleter = new Mutation(row); + } + entry.getKey().getColumnFamily(cf); + entry.getKey().getColumnQualifier(cq); + // the remove function adds the key with the delete flag set to true + deleter.putDelete(cf, cq); + } + + bw.addMutation(deleter); + } +} diff --git a/accumulo1.8/src/main/java/com/yahoo/ycsb/db/accumulo/package-info.java b/accumulo1.8/src/main/java/com/yahoo/ycsb/db/accumulo/package-info.java new file mode 100644 index 0000000000000000000000000000000000000000..e38d200c774c03138f6ba8f642b2d2fe9bebc578 --- /dev/null +++ b/accumulo1.8/src/main/java/com/yahoo/ycsb/db/accumulo/package-info.java @@ -0,0 +1,22 @@ +/** + * Copyright (c) 2015 YCSB contributors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +/** + * YCSB binding for <a href="https://accumulo.apache.org/">Apache Accumulo</a>. + */ +package com.yahoo.ycsb.db.accumulo; + diff --git a/accumulo1.8/src/test/java/com/yahoo/ycsb/db/accumulo/AccumuloTest.java b/accumulo1.8/src/test/java/com/yahoo/ycsb/db/accumulo/AccumuloTest.java new file mode 100644 index 0000000000000000000000000000000000000000..ce0b160f785ff94c6301d68a99b1e644289d7233 --- /dev/null +++ b/accumulo1.8/src/test/java/com/yahoo/ycsb/db/accumulo/AccumuloTest.java @@ -0,0 +1,218 @@ +/* + * Copyright (c) 2016 YCSB contributors. + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +package com.yahoo.ycsb.db.accumulo; + + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeTrue; + +import java.util.Map.Entry; +import java.util.Properties; + +import com.yahoo.ycsb.Workload; +import com.yahoo.ycsb.DB; +import com.yahoo.ycsb.measurements.Measurements; +import com.yahoo.ycsb.workloads.CoreWorkload; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.minicluster.MiniAccumuloCluster; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Use an Accumulo MiniCluster to test out basic workload operations with + * the Accumulo binding. + */ +public class AccumuloTest { + private static final Logger LOG = LoggerFactory.getLogger(AccumuloTest.class); + private static final int INSERT_COUNT = 2000; + private static final int TRANSACTION_COUNT = 2000; + + @ClassRule + public static TemporaryFolder workingDir = new TemporaryFolder(); + @Rule + public TestName test = new TestName(); + + private static MiniAccumuloCluster cluster; + private static Properties properties; + private Workload workload; + private DB client; + private Properties workloadProps; + + private static boolean isWindows() { + final String os = System.getProperty("os.name"); + return os.startsWith("Windows"); + } + + @BeforeClass + public static void setup() throws Exception { + // Minicluster setup fails on Windows with an UnsatisfiedLinkError. + // Skip if windows. + assumeTrue(!isWindows()); + cluster = new MiniAccumuloCluster(workingDir.newFolder("accumulo").getAbsoluteFile(), "protectyaneck"); + LOG.debug("starting minicluster"); + cluster.start(); + LOG.debug("creating connection for admin operations."); + // set up the table and user + final Connector admin = cluster.getConnector("root", "protectyaneck"); + admin.tableOperations().create(CoreWorkload.TABLENAME_PROPERTY_DEFAULT); + admin.securityOperations().createLocalUser("ycsb", new PasswordToken("protectyaneck")); + admin.securityOperations().grantTablePermission("ycsb", CoreWorkload.TABLENAME_PROPERTY_DEFAULT, TablePermission.READ); + admin.securityOperations().grantTablePermission("ycsb", CoreWorkload.TABLENAME_PROPERTY_DEFAULT, TablePermission.WRITE); + + // set properties the binding will read + properties = new Properties(); + properties.setProperty("accumulo.zooKeepers", cluster.getZooKeepers()); + properties.setProperty("accumulo.instanceName", cluster.getInstanceName()); + properties.setProperty("accumulo.columnFamily", "family"); + properties.setProperty("accumulo.username", "ycsb"); + properties.setProperty("accumulo.password", "protectyaneck"); + // cut down the batch writer timeout so that writes will push through. + properties.setProperty("accumulo.batchWriterMaxLatency", "4"); + // set these explicitly to the defaults at the time we're compiled, since they'll be inlined in our class. + properties.setProperty(CoreWorkload.TABLENAME_PROPERTY, CoreWorkload.TABLENAME_PROPERTY_DEFAULT); + properties.setProperty(CoreWorkload.FIELD_COUNT_PROPERTY, CoreWorkload.FIELD_COUNT_PROPERTY_DEFAULT); + properties.setProperty(CoreWorkload.INSERT_ORDER_PROPERTY, "ordered"); + } + + @AfterClass + public static void clusterCleanup() throws Exception { + if (cluster != null) { + LOG.debug("shutting down minicluster"); + cluster.stop(); + cluster = null; + } + } + + @Before + public void client() throws Exception { + + LOG.debug("Loading workload properties for {}", test.getMethodName()); + workloadProps = new Properties(); + workloadProps.load(getClass().getResourceAsStream("/workloads/" + test.getMethodName())); + + for (String prop : properties.stringPropertyNames()) { + workloadProps.setProperty(prop, properties.getProperty(prop)); + } + + // TODO we need a better test rig for 'run this ycsb workload' + LOG.debug("initializing measurements and workload"); + Measurements.setProperties(workloadProps); + workload = new CoreWorkload(); + workload.init(workloadProps); + + LOG.debug("initializing client"); + client = new AccumuloClient(); + client.setProperties(workloadProps); + client.init(); + } + + @After + public void cleanup() throws Exception { + if (client != null) { + LOG.debug("cleaning up client"); + client.cleanup(); + client = null; + } + if (workload != null) { + LOG.debug("cleaning up workload"); + workload.cleanup(); + } + } + + @After + public void truncateTable() throws Exception { + if (cluster != null) { + LOG.debug("truncating table {}", CoreWorkload.TABLENAME_PROPERTY_DEFAULT); + final Connector admin = cluster.getConnector("root", "protectyaneck"); + admin.tableOperations().deleteRows(CoreWorkload.TABLENAME_PROPERTY_DEFAULT, null, null); + } + } + + @Test + public void workloada() throws Exception { + runWorkload(); + } + + @Test + public void workloadb() throws Exception { + runWorkload(); + } + + @Test + public void workloadc() throws Exception { + runWorkload(); + } + + @Test + public void workloadd() throws Exception { + runWorkload(); + } + + @Test + public void workloade() throws Exception { + runWorkload(); + } + + /** + * go through a workload cycle. + * <ol> + * <li>initialize thread-specific state + * <li>load the workload dataset + * <li>run workload transactions + * </ol> + */ + private void runWorkload() throws Exception { + final Object state = workload.initThread(workloadProps,0,0); + LOG.debug("load"); + for (int i = 0; i < INSERT_COUNT; i++) { + assertTrue("insert failed.", workload.doInsert(client, state)); + } + // Ensure we wait long enough for the batch writer to flush + // TODO accumulo client should be flushing per insert by default. + Thread.sleep(2000); + LOG.debug("verify number of cells"); + final Scanner scanner = cluster.getConnector("root", "protectyaneck").createScanner(CoreWorkload.TABLENAME_PROPERTY_DEFAULT, Authorizations.EMPTY); + int count = 0; + for (Entry<Key, Value> entry : scanner) { + count++; + } + assertEquals("Didn't get enough total cells.", (Integer.valueOf(CoreWorkload.FIELD_COUNT_PROPERTY_DEFAULT) * INSERT_COUNT), count); + LOG.debug("run"); + for (int i = 0; i < TRANSACTION_COUNT; i++) { + assertTrue("transaction failed.", workload.doTransaction(client, state)); + } + } +} diff --git a/accumulo1.8/src/test/resources/log4j.properties b/accumulo1.8/src/test/resources/log4j.properties new file mode 100644 index 0000000000000000000000000000000000000000..2d48dce5a34914b9470a4948e4955b342bc03cbc --- /dev/null +++ b/accumulo1.8/src/test/resources/log4j.properties @@ -0,0 +1,29 @@ +# +# Copyright (c) 2015 YCSB contributors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you +# may not use this file except in compliance with the License. You +# may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. See the License for the specific language governing +# permissions and limitations under the License. See accompanying +# LICENSE file. +# + +# Root logger option +log4j.rootLogger=INFO, stderr + +log4j.appender.stderr=org.apache.log4j.ConsoleAppender +log4j.appender.stderr.target=System.err +log4j.appender.stderr.layout=org.apache.log4j.PatternLayout +log4j.appender.stderr.layout.conversionPattern=%d{yyyy/MM/dd HH:mm:ss} %-5p %c %x - %m%n + +# Suppress messages from ZooKeeper +log4j.logger.com.yahoo.ycsb.db.accumulo=DEBUG +log4j.logger.org.apache.zookeeper=ERROR +log4j.logger.org.apache.accumulo=WARN diff --git a/bin/bindings.properties b/bin/bindings.properties index 6486b0b5ac4fa23b32a3aa3dedb7714af97befb6..eecc6a2e0fcfbce18d5c2af077a0152f435fec9f 100644 --- a/bin/bindings.properties +++ b/bin/bindings.properties @@ -26,6 +26,9 @@ # use a dash with the version. (e.g. cassandra-7, cassandra-cql) # accumulo:com.yahoo.ycsb.db.accumulo.AccumuloClient +accumulo1.6:com.yahoo.ycsb.db.accumulo.AccumuloClient +accumulo1.7:com.yahoo.ycsb.db.accumulo.AccumuloClient +accumulo1.8:com.yahoo.ycsb.db.accumulo.AccumuloClient aerospike:com.yahoo.ycsb.db.AerospikeClient asynchbase:com.yahoo.ycsb.db.AsyncHBaseClient arangodb:com.yahoo.ycsb.db.ArangoDBClient diff --git a/bin/ycsb b/bin/ycsb index d84f9ec095f414d684d2d7738e2a527405a2d7d0..8626a473c1c853b96bae0f394ab2a1324f7a6450 100755 --- a/bin/ycsb +++ b/bin/ycsb @@ -52,6 +52,9 @@ COMMANDS = { DATABASES = { "accumulo" : "com.yahoo.ycsb.db.accumulo.AccumuloClient", + "accumulo1.6" : "com.yahoo.ycsb.db.accumulo.AccumuloClient", + "accumulo1.7" : "com.yahoo.ycsb.db.accumulo.AccumuloClient", + "accumulo1.8" : "com.yahoo.ycsb.db.accumulo.AccumuloClient", "aerospike" : "com.yahoo.ycsb.db.AerospikeClient", "arangodb" : "com.yahoo.ycsb.db.ArangoDBClient", "arangodb3" : "com.yahoo.ycsb.db.arangodb.ArangoDB3Client", @@ -247,6 +250,13 @@ def main(): # Classpath set up binding = args.database.split("-")[0] + if binding == "accumulo": + warn("The 'accumulo' client has been deprecated in favor of version " + "specific bindings. This name still maps to the binding for " + "Accumulo 1.6, which is named 'accumulo-1.6'. This alias will " + "be removed in a future YCSB release.") + binding = "accumulo1.6" + if binding == "cassandra2": warn("The 'cassandra2-cql' client has been deprecated. It has been " "renamed to simply 'cassandra-cql'. This alias will be removed" diff --git a/bin/ycsb.bat b/bin/ycsb.bat index f8ea7b4e25671b35586e1b9309993d610e69a7ac..1996e02342266ad35580552be6ed18b053e0cf93 100644 --- a/bin/ycsb.bat +++ b/bin/ycsb.bat @@ -117,6 +117,12 @@ GOTO confAdded SET CLASSPATH=%YCSB_HOME%\conf :confAdded +@REM Accumulo deprecation message +IF NOT "%BINDING_DIR%" == "accumulo" GOTO notAliasAccumulo +echo [WARN] The 'accumulo' client has been deprecated in favor of version specific bindings. This name still maps to the binding for Accumulo 1.6, which is named 'accumulo-1.6'. This alias will be removed in a future YCSB release. +SET BINDING_DIR=accumulo1.6 +:notAliasAccumulo + @REM Cassandra2 deprecation message IF NOT "%BINDING_DIR%" == "cassandra2" GOTO notAliasCassandra echo [WARN] The 'cassandra2-cql' client has been deprecated. It has been renamed to simply 'cassandra-cql'. This alias will be removed in the next YCSB release. diff --git a/bin/ycsb.sh b/bin/ycsb.sh index d62c22a4e4a8659e25817d44624e8f4782ff49ff..a157d6bb4f09cd3c344b1b62a1c09cd0df0a60b0 100755 --- a/bin/ycsb.sh +++ b/bin/ycsb.sh @@ -133,6 +133,15 @@ else CLASSPATH="$CLASSPATH:$YCSB_HOME/conf" fi +# Accumulo deprecation message +if [ "${BINDING_DIR}" = "accumulo" ] ; then + echo "[WARN] The 'accumulo' client has been deprecated in favor of version \ +specific bindings. This name still maps to the binding for \ +Accumulo 1.6, which is named 'accumulo-1.6'. This alias will \ +be removed in a future YCSB release." + BINDING_DIR="accumulo1.6" +fi + # Cassandra2 deprecation message if [ "${BINDING_DIR}" = "cassandra2" ] ; then echo "[WARN] The 'cassandra2-cql' client has been deprecated. It has been \ diff --git a/distribution/pom.xml b/distribution/pom.xml index 8d477fce119e66861fdbf7624761b34db115981f..965decb1de14466e6c28ea68dba85ba0dc035b43 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -41,7 +41,17 @@ LICENSE file. </dependency> <dependency> <groupId>com.yahoo.ycsb</groupId> - <artifactId>accumulo-binding</artifactId> + <artifactId>accumulo1.6-binding</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>com.yahoo.ycsb</groupId> + <artifactId>accumulo1.7-binding</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>com.yahoo.ycsb</groupId> + <artifactId>accumulo1.8-binding</artifactId> <version>${project.version}</version> </dependency> <dependency> diff --git a/pom.xml b/pom.xml index e80175a3b2990c056f46d948bc44c12c8c3da1cc..6e7629df830360fb888a0da5c29c4cbb92b8c1c6 100644 --- a/pom.xml +++ b/pom.xml @@ -72,7 +72,9 @@ LICENSE file. <hbase098.version>0.98.14-hadoop2</hbase098.version> <hbase10.version>1.0.2</hbase10.version> <hbase12.version>1.2.5</hbase12.version> - <accumulo.version>1.6.0</accumulo.version> + <accumulo.1.6.version>1.6.6</accumulo.1.6.version> + <accumulo.1.7.version>1.7.3</accumulo.1.7.version> + <accumulo.1.8.version>1.8.1</accumulo.1.8.version> <cassandra.cql.version>3.0.0</cassandra.cql.version> <geode.version>1.2.0</geode.version> <azuredocumentdb.version>1.8.1</azuredocumentdb.version> @@ -110,7 +112,9 @@ LICENSE file. <module>binding-parent</module> <module>distribution</module> <!-- all the datastore bindings, lex sorted please --> - <module>accumulo</module> + <module>accumulo1.6</module> + <module>accumulo1.7</module> + <module>accumulo1.8</module> <module>aerospike</module> <module>arangodb</module> <module>arangodb3</module>