Skip to content
Snippets Groups Projects
Commit f31b2339 authored by Sean Busbey's avatar Sean Busbey
Browse files

[accumulo, accumulo1.6, accumulo1.7, accumulo1.8] switch Accumulo to minor...

[accumulo, accumulo1.6, accumulo1.7, accumulo1.8] switch Accumulo to minor version specific bindings.

* break out accumulo modules
* add deprecation warning
* switch travis to use short hostname to get around long hostname error.
parent 600703cf
No related branches found
No related tags found
No related merge requests found
Showing
with 1553 additions and 8 deletions
......@@ -24,6 +24,11 @@ jdk:
- oraclejdk8
- openjdk7
addons:
hosts:
- myshorthost
hostname: myshorthost
install: mvn install -q -DskipTests=true
script: mvn test -q
......@@ -35,5 +40,5 @@ services:
# - riak
# Use the Container based infrastructure.
sudo: false
# Can't use container based infra because of hosts/hostname
sudo: true
<!--
Copyright (c) 2015 YCSB contributors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License"); you
may not use this file except in compliance with the License. You
may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied. See the License for the specific language governing
permissions and limitations under the License. See accompanying
LICENSE file.
-->
## Quick Start
This section describes how to run YCSB on [Accumulo](https://accumulo.apache.org/).
### 1. Start Accumulo
See the [Accumulo Documentation](https://accumulo.apache.org/1.6/accumulo_user_manual.html#_installation)
for details on installing and running Accumulo.
Before running the YCSB test you must create the Accumulo table. Again see the
[Accumulo Documentation](https://accumulo.apache.org/1.6/accumulo_user_manual.html#_basic_administration)
for details. The default table name is `ycsb`.
### 2. Set Up YCSB
Git clone YCSB and compile:
git clone http://github.com/brianfrankcooper/YCSB.git
cd YCSB
mvn -pl com.yahoo.ycsb:aerospike-binding -am clean package
### 3. Create the Accumulo table
By default, YCSB uses a table with the name "usertable". Users must create this table before loading
data into Accumulo. For maximum Accumulo performance, the Accumulo table must be pre-split. A simple
Ruby script, based on the HBase README, can generate adequate split-point. 10's of Tablets per
TabletServer is a good starting point. Unless otherwise specified, the following commands should run
on any version of Accumulo.
$ echo 'num_splits = 20; puts (1..num_splits).map {|i| "user#{1000+i*(9999-1000)/num_splits}"}' | ruby > /tmp/splits.txt
$ accumulo shell -u <user> -p <password> -e "createtable usertable"
$ accumulo shell -u <user> -p <password> -e "addsplits -t usertable -sf /tmp/splits.txt"
$ accumulo shell -u <user> -p <password> -e "config -t usertable -s table.cache.block.enable=true"
Additionally, there are some other configuration properties which can increase performance. These
can be set on the Accumulo table via the shell after it is created. Setting the table durability
to `flush` relaxes the constraints on data durability during hard power-outages (avoids calls
to fsync). Accumulo defaults table compression to `gzip` which is not particularly fast; `snappy`
is a faster and similarly-efficient option. The mutation queue property controls how many writes
that Accumulo will buffer in memory before performing a flush; this property should be set relative
to the amount of JVM heap the TabletServers are given.
Please note that the `table.durability` and `tserver.total.mutation.queue.max` properties only
exists for >=Accumulo-1.7. There are no concise replacements for these properties in earlier versions.
accumulo> config -s table.durability=flush
accumulo> config -s tserver.total.mutation.queue.max=256M
accumulo> config -t usertable -s table.file.compress.type=snappy
On repeated data loads, the following commands may be helpful to re-set the state of the table quickly.
accumulo> createtable tmp --copy-splits usertable --copy-config usertable
accumulo> deletetable --force usertable
accumulo> renametable tmp usertable
accumulo> compact --wait -t accumulo.metadata
### 4. Load Data and Run Tests
Load the data:
./bin/ycsb load accumulo1.6 -s -P workloads/workloada \
-p accumulo.zooKeepers=localhost \
-p accumulo.columnFamily=ycsb \
-p accumulo.instanceName=ycsb \
-p accumulo.username=user \
-p accumulo.password=supersecret \
> outputLoad.txt
Run the workload test:
./bin/ycsb run accumulo1.6 -s -P workloads/workloada \
-p accumulo.zooKeepers=localhost \
-p accumulo.columnFamily=ycsb \
-p accumulo.instanceName=ycsb \
-p accumulo.username=user \
-p accumulo.password=supersecret \
> outputLoad.txt
## Accumulo Configuration Parameters
- `accumulo.zooKeepers`
- The Accumulo cluster's [zookeeper servers](https://accumulo.apache.org/1.6/accumulo_user_manual.html#_connecting).
- Should contain a comma separated list of of hostname or hostname:port values.
- No default value.
- `accumulo.columnFamily`
- The name of the column family to use to store the data within the table.
- No default value.
- `accumulo.instanceName`
- Name of the Accumulo [instance](https://accumulo.apache.org/1.6/accumulo_user_manual.html#_connecting).
- No default value.
- `accumulo.username`
- The username to use when connecting to Accumulo.
- No default value.
- `accumulo.password`
- The password for the user connecting to Accumulo.
- No default value.
......@@ -25,8 +25,8 @@ LICENSE file.
<version>0.14.0-SNAPSHOT</version>
<relativePath>../binding-parent</relativePath>
</parent>
<artifactId>accumulo-binding</artifactId>
<name>Accumulo DB Binding</name>
<artifactId>accumulo1.6-binding</artifactId>
<name>Accumulo 1.6 DB Binding</name>
<properties>
<!-- This should match up to the one from your Accumulo version -->
<hadoop.version>2.2.0</hadoop.version>
......@@ -37,7 +37,7 @@ LICENSE file.
<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-core</artifactId>
<version>${accumulo.version}</version>
<version>${accumulo.1.6.version}</version>
</dependency>
<!-- Needed for hadoop.io.Text :( -->
<dependency>
......@@ -66,7 +66,7 @@ LICENSE file.
<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-minicluster</artifactId>
<version>${accumulo.version}</version>
<version>${accumulo.1.6.version}</version>
<scope>test</scope>
</dependency>
<!-- needed directly only in test, but transitive
......
/**
* Copyright (c) 2011 YCSB++ project, 2014-2016 YCSB contributors.
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb.db.accumulo;
import static java.nio.charset.StandardCharsets.UTF_8;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedMap;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.user.WholeRowIterator;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.CleanUp;
import org.apache.hadoop.io.Text;
import com.yahoo.ycsb.ByteArrayByteIterator;
import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.DB;
import com.yahoo.ycsb.DBException;
import com.yahoo.ycsb.Status;
/**
* <a href="https://accumulo.apache.org/">Accumulo</a> binding for YCSB.
*/
public class AccumuloClient extends DB {
private ZooKeeperInstance inst;
private Connector connector;
private Text colFam = new Text("");
private byte[] colFamBytes = new byte[0];
private final ConcurrentHashMap<String, BatchWriter> writers = new ConcurrentHashMap<>();
static {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
CleanUp.shutdownNow();
}
});
}
@Override
public void init() throws DBException {
colFam = new Text(getProperties().getProperty("accumulo.columnFamily"));
colFamBytes = colFam.toString().getBytes(UTF_8);
inst = new ZooKeeperInstance(
getProperties().getProperty("accumulo.instanceName"),
getProperties().getProperty("accumulo.zooKeepers"));
try {
String principal = getProperties().getProperty("accumulo.username");
AuthenticationToken token =
new PasswordToken(getProperties().getProperty("accumulo.password"));
connector = inst.getConnector(principal, token);
} catch (AccumuloException | AccumuloSecurityException e) {
throw new DBException(e);
}
if (!(getProperties().getProperty("accumulo.pcFlag", "none").equals("none"))) {
System.err.println("Sorry, the ZK based producer/consumer implementation has been removed. " +
"Please see YCSB issue #416 for work on adding a general solution to coordinated work.");
}
}
@Override
public void cleanup() throws DBException {
try {
Iterator<BatchWriter> iterator = writers.values().iterator();
while (iterator.hasNext()) {
BatchWriter writer = iterator.next();
writer.close();
iterator.remove();
}
} catch (MutationsRejectedException e) {
throw new DBException(e);
}
}
/**
* Called when the user specifies a table that isn't the same as the existing
* table. Connect to it and if necessary, close our current connection.
*
* @param table
* The table to open.
*/
public BatchWriter getWriter(String table) throws TableNotFoundException {
// tl;dr We're paying a cost for the ConcurrentHashMap here to deal with the DB api.
// We know that YCSB is really only ever going to send us data for one table, so using
// a concurrent data structure is overkill (especially in such a hot code path).
// However, the impact seems to be relatively negligible in trivial local tests and it's
// "more correct" WRT to the API.
BatchWriter writer = writers.get(table);
if (null == writer) {
BatchWriter newWriter = createBatchWriter(table);
BatchWriter oldWriter = writers.putIfAbsent(table, newWriter);
// Someone beat us to creating a BatchWriter for this table, use their BatchWriters
if (null != oldWriter) {
try {
// Make sure to clean up our new batchwriter!
newWriter.close();
} catch (MutationsRejectedException e) {
throw new RuntimeException(e);
}
writer = oldWriter;
} else {
writer = newWriter;
}
}
return writer;
}
/**
* Creates a BatchWriter with the expected configuration.
*
* @param table The table to write to
*/
private BatchWriter createBatchWriter(String table) throws TableNotFoundException {
BatchWriterConfig bwc = new BatchWriterConfig();
bwc.setMaxLatency(
Long.parseLong(getProperties()
.getProperty("accumulo.batchWriterMaxLatency", "30000")),
TimeUnit.MILLISECONDS);
bwc.setMaxMemory(Long.parseLong(
getProperties().getProperty("accumulo.batchWriterSize", "100000")));
final String numThreadsValue = getProperties().getProperty("accumulo.batchWriterThreads");
// Try to saturate the client machine.
int numThreads = Math.max(1, Runtime.getRuntime().availableProcessors() / 2);
if (null != numThreadsValue) {
numThreads = Integer.parseInt(numThreadsValue);
}
System.err.println("Using " + numThreads + " threads to write data");
bwc.setMaxWriteThreads(numThreads);
return connector.createBatchWriter(table, bwc);
}
/**
* Gets a scanner from Accumulo over one row.
*
* @param row the row to scan
* @param fields the set of columns to scan
* @return an Accumulo {@link Scanner} bound to the given row and columns
*/
private Scanner getRow(String table, Text row, Set<String> fields) throws TableNotFoundException {
Scanner scanner = connector.createScanner(table, Authorizations.EMPTY);
scanner.setRange(new Range(row));
if (fields != null) {
for (String field : fields) {
scanner.fetchColumn(colFam, new Text(field));
}
}
return scanner;
}
@Override
public Status read(String table, String key, Set<String> fields,
Map<String, ByteIterator> result) {
Scanner scanner = null;
try {
scanner = getRow(table, new Text(key), null);
// Pick out the results we care about.
final Text cq = new Text();
for (Entry<Key, Value> entry : scanner) {
entry.getKey().getColumnQualifier(cq);
Value v = entry.getValue();
byte[] buf = v.get();
result.put(cq.toString(),
new ByteArrayByteIterator(buf));
}
} catch (Exception e) {
System.err.println("Error trying to reading Accumulo table " + table + " " + key);
e.printStackTrace();
return Status.ERROR;
} finally {
if (null != scanner) {
scanner.close();
}
}
return Status.OK;
}
@Override
public Status scan(String table, String startkey, int recordcount,
Set<String> fields, Vector<HashMap<String, ByteIterator>> result) {
// Just make the end 'infinity' and only read as much as we need.
Scanner scanner = null;
try {
scanner = connector.createScanner(table, Authorizations.EMPTY);
scanner.setRange(new Range(new Text(startkey), null));
// Have Accumulo send us complete rows, serialized in a single Key-Value pair
IteratorSetting cfg = new IteratorSetting(100, WholeRowIterator.class);
scanner.addScanIterator(cfg);
// If no fields are provided, we assume one column/row.
if (fields != null) {
// And add each of them as fields we want.
for (String field : fields) {
scanner.fetchColumn(colFam, new Text(field));
}
}
int count = 0;
for (Entry<Key, Value> entry : scanner) {
// Deserialize the row
SortedMap<Key, Value> row = WholeRowIterator.decodeRow(entry.getKey(), entry.getValue());
HashMap<String, ByteIterator> rowData;
if (null != fields) {
rowData = new HashMap<>(fields.size());
} else {
rowData = new HashMap<>();
}
result.add(rowData);
// Parse the data in the row, avoid unnecessary Text object creation
final Text cq = new Text();
for (Entry<Key, Value> rowEntry : row.entrySet()) {
rowEntry.getKey().getColumnQualifier(cq);
rowData.put(cq.toString(), new ByteArrayByteIterator(rowEntry.getValue().get()));
}
if (count++ == recordcount) { // Done reading the last row.
break;
}
}
} catch (TableNotFoundException e) {
System.err.println("Error trying to connect to Accumulo table.");
e.printStackTrace();
return Status.ERROR;
} catch (IOException e) {
System.err.println("Error deserializing data from Accumulo.");
e.printStackTrace();
return Status.ERROR;
} finally {
if (null != scanner) {
scanner.close();
}
}
return Status.OK;
}
@Override
public Status update(String table, String key,
Map<String, ByteIterator> values) {
BatchWriter bw = null;
try {
bw = getWriter(table);
} catch (TableNotFoundException e) {
System.err.println("Error opening batch writer to Accumulo table " + table);
e.printStackTrace();
return Status.ERROR;
}
Mutation mutInsert = new Mutation(key.getBytes(UTF_8));
for (Map.Entry<String, ByteIterator> entry : values.entrySet()) {
mutInsert.put(colFamBytes, entry.getKey().getBytes(UTF_8), entry.getValue().toArray());
}
try {
bw.addMutation(mutInsert);
} catch (MutationsRejectedException e) {
System.err.println("Error performing update.");
e.printStackTrace();
return Status.ERROR;
}
return Status.BATCHED_OK;
}
@Override
public Status insert(String t, String key,
Map<String, ByteIterator> values) {
return update(t, key, values);
}
@Override
public Status delete(String table, String key) {
BatchWriter bw;
try {
bw = getWriter(table);
} catch (TableNotFoundException e) {
System.err.println("Error trying to connect to Accumulo table.");
e.printStackTrace();
return Status.ERROR;
}
try {
deleteRow(table, new Text(key), bw);
} catch (TableNotFoundException | MutationsRejectedException e) {
System.err.println("Error performing delete.");
e.printStackTrace();
return Status.ERROR;
} catch (RuntimeException e) {
System.err.println("Error performing delete.");
e.printStackTrace();
return Status.ERROR;
}
return Status.OK;
}
// These functions are adapted from RowOperations.java:
private void deleteRow(String table, Text row, BatchWriter bw) throws MutationsRejectedException,
TableNotFoundException {
// TODO Use a batchDeleter instead
deleteRow(getRow(table, row, null), bw);
}
/**
* Deletes a row, given a Scanner of JUST that row.
*/
private void deleteRow(Scanner scanner, BatchWriter bw) throws MutationsRejectedException {
Mutation deleter = null;
// iterate through the keys
final Text row = new Text();
final Text cf = new Text();
final Text cq = new Text();
for (Entry<Key, Value> entry : scanner) {
// create a mutation for the row
if (deleter == null) {
entry.getKey().getRow(row);
deleter = new Mutation(row);
}
entry.getKey().getColumnFamily(cf);
entry.getKey().getColumnQualifier(cq);
// the remove function adds the key with the delete flag set to true
deleter.putDelete(cf, cq);
}
bw.addMutation(deleter);
}
}
#
# Copyright (c) 2015 YCSB contributors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you
# may not use this file except in compliance with the License. You
# may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing
# permissions and limitations under the License. See accompanying
# LICENSE file.
#
# Root logger option
log4j.rootLogger=INFO, stderr
log4j.appender.stderr=org.apache.log4j.ConsoleAppender
log4j.appender.stderr.target=System.err
log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
log4j.appender.stderr.layout.conversionPattern=%d{yyyy/MM/dd HH:mm:ss} %-5p %c %x - %m%n
# Suppress messages from ZooKeeper
log4j.logger.com.yahoo.ycsb.db.accumulo=INFO
log4j.logger.org.apache.zookeeper=ERROR
log4j.logger.org.apache.accumulo=WARN
......@@ -75,7 +75,7 @@ On repeated data loads, the following commands may be helpful to re-set the stat
Load the data:
./bin/ycsb load accumulo -s -P workloads/workloada \
./bin/ycsb load accumulo1.7 -s -P workloads/workloada \
-p accumulo.zooKeepers=localhost \
-p accumulo.columnFamily=ycsb \
-p accumulo.instanceName=ycsb \
......@@ -85,7 +85,7 @@ Load the data:
Run the workload test:
./bin/ycsb run accumulo -s -P workloads/workloada \
./bin/ycsb run accumulo1.7 -s -P workloads/workloada \
-p accumulo.zooKeepers=localhost \
-p accumulo.columnFamily=ycsb \
-p accumulo.instanceName=ycsb \
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2011 YCSB++ project, 2014 - 2016 YCSB contributors.
All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License"); you
may not use this file except in compliance with the License. You
may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied. See the License for the specific language governing
permissions and limitations under the License. See accompanying
LICENSE file.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.yahoo.ycsb</groupId>
<artifactId>binding-parent</artifactId>
<version>0.14.0-SNAPSHOT</version>
<relativePath>../binding-parent</relativePath>
</parent>
<artifactId>accumulo1.7-binding</artifactId>
<name>Accumulo 1.7 DB Binding</name>
<properties>
<!-- This should match up to the one from your Accumulo version -->
<hadoop.version>2.2.0</hadoop.version>
<!-- Tests do not run on jdk9 -->
<skipJDK9Tests>true</skipJDK9Tests>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-core</artifactId>
<version>${accumulo.1.7.version}</version>
</dependency>
<!-- Needed for hadoop.io.Text :( -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.yahoo.ycsb</groupId>
<artifactId>core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-minicluster</artifactId>
<version>${accumulo.1.7.version}</version>
<scope>test</scope>
</dependency>
<!-- needed directly only in test, but transitive
at runtime for accumulo, hadoop, and thrift. -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.13</version>
</dependency>
</dependencies>
<build>
<testResources>
<testResource>
<directory>../workloads</directory>
<targetPath>workloads</targetPath>
</testResource>
<testResource>
<directory>src/test/resources</directory>
</testResource>
</testResources>
</build>
</project>
# Copyright 2014 Cloudera, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you
# may not use this file except in compliance with the License. You
# may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing
# permissions and limitations under the License. See accompanying
# LICENSE file.
#
# Sample Accumulo configuration properties
#
# You may either set properties here or via the command line.
#
# This will influence the keys we write
accumulo.columnFamily=YCSB
# This should be set based on your Accumulo cluster
#accumulo.instanceName=ExampleInstance
# Comma separated list of host:port tuples for the ZooKeeper quorum used
# by your Accumulo cluster
#accumulo.zooKeepers=zoo1.example.com:2181,zoo2.example.com:2181,zoo3.example.com:2181
# This user will need permissions on the table YCSB works against
#accumulo.username=ycsb
#accumulo.password=protectyaneck
# Controls how long our client writer will wait to buffer more data
# measured in milliseconds
accumulo.batchWriterMaxLatency=30000
# Controls how much data our client will attempt to buffer before sending
# measured in bytes
accumulo.batchWriterSize=100000
# Controls how many worker threads our client will use to parallelize writes
accumulo.batchWriterThreads=1
/**
* Copyright (c) 2015 YCSB contributors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
/**
* YCSB binding for <a href="https://accumulo.apache.org/">Apache Accumulo</a>.
*/
package com.yahoo.ycsb.db.accumulo;
/*
* Copyright (c) 2016 YCSB contributors.
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb.db.accumulo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;
import java.util.Map.Entry;
import java.util.Properties;
import com.yahoo.ycsb.Workload;
import com.yahoo.ycsb.DB;
import com.yahoo.ycsb.measurements.Measurements;
import com.yahoo.ycsb.workloads.CoreWorkload;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.minicluster.MiniAccumuloCluster;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Use an Accumulo MiniCluster to test out basic workload operations with
* the Accumulo binding.
*/
public class AccumuloTest {
private static final Logger LOG = LoggerFactory.getLogger(AccumuloTest.class);
private static final int INSERT_COUNT = 2000;
private static final int TRANSACTION_COUNT = 2000;
@ClassRule
public static TemporaryFolder workingDir = new TemporaryFolder();
@Rule
public TestName test = new TestName();
private static MiniAccumuloCluster cluster;
private static Properties properties;
private Workload workload;
private DB client;
private Properties workloadProps;
private static boolean isWindows() {
final String os = System.getProperty("os.name");
return os.startsWith("Windows");
}
@BeforeClass
public static void setup() throws Exception {
// Minicluster setup fails on Windows with an UnsatisfiedLinkError.
// Skip if windows.
assumeTrue(!isWindows());
cluster = new MiniAccumuloCluster(workingDir.newFolder("accumulo").getAbsoluteFile(), "protectyaneck");
LOG.debug("starting minicluster");
cluster.start();
LOG.debug("creating connection for admin operations.");
// set up the table and user
final Connector admin = cluster.getConnector("root", "protectyaneck");
admin.tableOperations().create(CoreWorkload.TABLENAME_PROPERTY_DEFAULT);
admin.securityOperations().createLocalUser("ycsb", new PasswordToken("protectyaneck"));
admin.securityOperations().grantTablePermission("ycsb", CoreWorkload.TABLENAME_PROPERTY_DEFAULT, TablePermission.READ);
admin.securityOperations().grantTablePermission("ycsb", CoreWorkload.TABLENAME_PROPERTY_DEFAULT, TablePermission.WRITE);
// set properties the binding will read
properties = new Properties();
properties.setProperty("accumulo.zooKeepers", cluster.getZooKeepers());
properties.setProperty("accumulo.instanceName", cluster.getInstanceName());
properties.setProperty("accumulo.columnFamily", "family");
properties.setProperty("accumulo.username", "ycsb");
properties.setProperty("accumulo.password", "protectyaneck");
// cut down the batch writer timeout so that writes will push through.
properties.setProperty("accumulo.batchWriterMaxLatency", "4");
// set these explicitly to the defaults at the time we're compiled, since they'll be inlined in our class.
properties.setProperty(CoreWorkload.TABLENAME_PROPERTY, CoreWorkload.TABLENAME_PROPERTY_DEFAULT);
properties.setProperty(CoreWorkload.FIELD_COUNT_PROPERTY, CoreWorkload.FIELD_COUNT_PROPERTY_DEFAULT);
properties.setProperty(CoreWorkload.INSERT_ORDER_PROPERTY, "ordered");
}
@AfterClass
public static void clusterCleanup() throws Exception {
if (cluster != null) {
LOG.debug("shutting down minicluster");
cluster.stop();
cluster = null;
}
}
@Before
public void client() throws Exception {
LOG.debug("Loading workload properties for {}", test.getMethodName());
workloadProps = new Properties();
workloadProps.load(getClass().getResourceAsStream("/workloads/" + test.getMethodName()));
for (String prop : properties.stringPropertyNames()) {
workloadProps.setProperty(prop, properties.getProperty(prop));
}
// TODO we need a better test rig for 'run this ycsb workload'
LOG.debug("initializing measurements and workload");
Measurements.setProperties(workloadProps);
workload = new CoreWorkload();
workload.init(workloadProps);
LOG.debug("initializing client");
client = new AccumuloClient();
client.setProperties(workloadProps);
client.init();
}
@After
public void cleanup() throws Exception {
if (client != null) {
LOG.debug("cleaning up client");
client.cleanup();
client = null;
}
if (workload != null) {
LOG.debug("cleaning up workload");
workload.cleanup();
}
}
@After
public void truncateTable() throws Exception {
if (cluster != null) {
LOG.debug("truncating table {}", CoreWorkload.TABLENAME_PROPERTY_DEFAULT);
final Connector admin = cluster.getConnector("root", "protectyaneck");
admin.tableOperations().deleteRows(CoreWorkload.TABLENAME_PROPERTY_DEFAULT, null, null);
}
}
@Test
public void workloada() throws Exception {
runWorkload();
}
@Test
public void workloadb() throws Exception {
runWorkload();
}
@Test
public void workloadc() throws Exception {
runWorkload();
}
@Test
public void workloadd() throws Exception {
runWorkload();
}
@Test
public void workloade() throws Exception {
runWorkload();
}
/**
* go through a workload cycle.
* <ol>
* <li>initialize thread-specific state
* <li>load the workload dataset
* <li>run workload transactions
* </ol>
*/
private void runWorkload() throws Exception {
final Object state = workload.initThread(workloadProps,0,0);
LOG.debug("load");
for (int i = 0; i < INSERT_COUNT; i++) {
assertTrue("insert failed.", workload.doInsert(client, state));
}
// Ensure we wait long enough for the batch writer to flush
// TODO accumulo client should be flushing per insert by default.
Thread.sleep(2000);
LOG.debug("verify number of cells");
final Scanner scanner = cluster.getConnector("root", "protectyaneck").createScanner(CoreWorkload.TABLENAME_PROPERTY_DEFAULT, Authorizations.EMPTY);
int count = 0;
for (Entry<Key, Value> entry : scanner) {
count++;
}
assertEquals("Didn't get enough total cells.", (Integer.valueOf(CoreWorkload.FIELD_COUNT_PROPERTY_DEFAULT) * INSERT_COUNT), count);
LOG.debug("run");
for (int i = 0; i < TRANSACTION_COUNT; i++) {
assertTrue("transaction failed.", workload.doTransaction(client, state));
}
}
}
<!--
Copyright (c) 2015 YCSB contributors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License"); you
may not use this file except in compliance with the License. You
may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied. See the License for the specific language governing
permissions and limitations under the License. See accompanying
LICENSE file.
-->
## Quick Start
This section describes how to run YCSB on [Accumulo](https://accumulo.apache.org/).
### 1. Start Accumulo
See the [Accumulo Documentation](https://accumulo.apache.org/1.8/accumulo_user_manual.html#_installation)
for details on installing and running Accumulo.
Before running the YCSB test you must create the Accumulo table. Again see the
[Accumulo Documentation](https://accumulo.apache.org/1.8/accumulo_user_manual.html#_basic_administration)
for details. The default table name is `ycsb`.
### 2. Set Up YCSB
Git clone YCSB and compile:
git clone http://github.com/brianfrankcooper/YCSB.git
cd YCSB
mvn -pl com.yahoo.ycsb:aerospike-binding -am clean package
### 3. Create the Accumulo table
By default, YCSB uses a table with the name "usertable". Users must create this table before loading
data into Accumulo. For maximum Accumulo performance, the Accumulo table must be pre-split. A simple
Ruby script, based on the HBase README, can generate adequate split-point. 10's of Tablets per
TabletServer is a good starting point. Unless otherwise specified, the following commands should run
on any version of Accumulo.
$ echo 'num_splits = 20; puts (1..num_splits).map {|i| "user#{1000+i*(9999-1000)/num_splits}"}' | ruby > /tmp/splits.txt
$ accumulo shell -u <user> -p <password> -e "createtable usertable"
$ accumulo shell -u <user> -p <password> -e "addsplits -t usertable -sf /tmp/splits.txt"
$ accumulo shell -u <user> -p <password> -e "config -t usertable -s table.cache.block.enable=true"
Additionally, there are some other configuration properties which can increase performance. These
can be set on the Accumulo table via the shell after it is created. Setting the table durability
to `flush` relaxes the constraints on data durability during hard power-outages (avoids calls
to fsync). Accumulo defaults table compression to `gzip` which is not particularly fast; `snappy`
is a faster and similarly-efficient option. The mutation queue property controls how many writes
that Accumulo will buffer in memory before performing a flush; this property should be set relative
to the amount of JVM heap the TabletServers are given.
Please note that the `table.durability` and `tserver.total.mutation.queue.max` properties only
exists for >=Accumulo-1.7. There are no concise replacements for these properties in earlier versions.
accumulo> config -s table.durability=flush
accumulo> config -s tserver.total.mutation.queue.max=256M
accumulo> config -t usertable -s table.file.compress.type=snappy
On repeated data loads, the following commands may be helpful to re-set the state of the table quickly.
accumulo> createtable tmp --copy-splits usertable --copy-config usertable
accumulo> deletetable --force usertable
accumulo> renametable tmp usertable
accumulo> compact --wait -t accumulo.metadata
### 4. Load Data and Run Tests
Load the data:
./bin/ycsb load accumulo1.8 -s -P workloads/workloada \
-p accumulo.zooKeepers=localhost \
-p accumulo.columnFamily=ycsb \
-p accumulo.instanceName=ycsb \
-p accumulo.username=user \
-p accumulo.password=supersecret \
> outputLoad.txt
Run the workload test:
./bin/ycsb run accumulo1.8 -s -P workloads/workloada \
-p accumulo.zooKeepers=localhost \
-p accumulo.columnFamily=ycsb \
-p accumulo.instanceName=ycsb \
-p accumulo.username=user \
-p accumulo.password=supersecret \
> outputLoad.txt
## Accumulo Configuration Parameters
- `accumulo.zooKeepers`
- The Accumulo cluster's [zookeeper servers](https://accumulo.apache.org/1.8/accumulo_user_manual.html#_connecting).
- Should contain a comma separated list of of hostname or hostname:port values.
- No default value.
- `accumulo.columnFamily`
- The name of the column family to use to store the data within the table.
- No default value.
- `accumulo.instanceName`
- Name of the Accumulo [instance](https://accumulo.apache.org/1.8/accumulo_user_manual.html#_connecting).
- No default value.
- `accumulo.username`
- The username to use when connecting to Accumulo.
- No default value.
- `accumulo.password`
- The password for the user connecting to Accumulo.
- No default value.
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2011 YCSB++ project, 2014 - 2016 YCSB contributors.
All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License"); you
may not use this file except in compliance with the License. You
may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied. See the License for the specific language governing
permissions and limitations under the License. See accompanying
LICENSE file.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.yahoo.ycsb</groupId>
<artifactId>binding-parent</artifactId>
<version>0.14.0-SNAPSHOT</version>
<relativePath>../binding-parent</relativePath>
</parent>
<artifactId>accumulo1.8-binding</artifactId>
<name>Accumulo 1.8 DB Binding</name>
<properties>
<!-- This should match up to the one from your Accumulo version -->
<hadoop.version>2.6.4</hadoop.version>
<!-- Tests do not run on jdk9 -->
<skipJDK9Tests>true</skipJDK9Tests>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-core</artifactId>
<version>${accumulo.1.8.version}</version>
</dependency>
<!-- Needed for hadoop.io.Text :( -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.yahoo.ycsb</groupId>
<artifactId>core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-minicluster</artifactId>
<version>${accumulo.1.8.version}</version>
<scope>test</scope>
</dependency>
<!-- needed directly only in test, but transitive
at runtime for accumulo, hadoop, and thrift. -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.13</version>
</dependency>
</dependencies>
<build>
<testResources>
<testResource>
<directory>../workloads</directory>
<targetPath>workloads</targetPath>
</testResource>
<testResource>
<directory>src/test/resources</directory>
</testResource>
</testResources>
</build>
</project>
# Copyright 2014 Cloudera, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you
# may not use this file except in compliance with the License. You
# may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing
# permissions and limitations under the License. See accompanying
# LICENSE file.
#
# Sample Accumulo configuration properties
#
# You may either set properties here or via the command line.
#
# This will influence the keys we write
accumulo.columnFamily=YCSB
# This should be set based on your Accumulo cluster
#accumulo.instanceName=ExampleInstance
# Comma separated list of host:port tuples for the ZooKeeper quorum used
# by your Accumulo cluster
#accumulo.zooKeepers=zoo1.example.com:2181,zoo2.example.com:2181,zoo3.example.com:2181
# This user will need permissions on the table YCSB works against
#accumulo.username=ycsb
#accumulo.password=protectyaneck
# Controls how long our client writer will wait to buffer more data
# measured in milliseconds
accumulo.batchWriterMaxLatency=30000
# Controls how much data our client will attempt to buffer before sending
# measured in bytes
accumulo.batchWriterSize=100000
# Controls how many worker threads our client will use to parallelize writes
accumulo.batchWriterThreads=1
/**
* Copyright (c) 2011 YCSB++ project, 2014-2016 YCSB contributors.
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb.db.accumulo;
import static java.nio.charset.StandardCharsets.UTF_8;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedMap;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.ClientConfiguration;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.user.WholeRowIterator;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.CleanUp;
import org.apache.hadoop.io.Text;
import com.yahoo.ycsb.ByteArrayByteIterator;
import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.DB;
import com.yahoo.ycsb.DBException;
import com.yahoo.ycsb.Status;
/**
* <a href="https://accumulo.apache.org/">Accumulo</a> binding for YCSB.
*/
public class AccumuloClient extends DB {
private ZooKeeperInstance inst;
private Connector connector;
private Text colFam = new Text("");
private byte[] colFamBytes = new byte[0];
private final ConcurrentHashMap<String, BatchWriter> writers = new ConcurrentHashMap<>();
static {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
CleanUp.shutdownNow();
}
});
}
@Override
public void init() throws DBException {
colFam = new Text(getProperties().getProperty("accumulo.columnFamily"));
colFamBytes = colFam.toString().getBytes(UTF_8);
inst = new ZooKeeperInstance(new ClientConfiguration()
.withInstance(getProperties().getProperty("accumulo.instanceName"))
.withZkHosts(getProperties().getProperty("accumulo.zooKeepers")));
try {
String principal = getProperties().getProperty("accumulo.username");
AuthenticationToken token =
new PasswordToken(getProperties().getProperty("accumulo.password"));
connector = inst.getConnector(principal, token);
} catch (AccumuloException | AccumuloSecurityException e) {
throw new DBException(e);
}
if (!(getProperties().getProperty("accumulo.pcFlag", "none").equals("none"))) {
System.err.println("Sorry, the ZK based producer/consumer implementation has been removed. " +
"Please see YCSB issue #416 for work on adding a general solution to coordinated work.");
}
}
@Override
public void cleanup() throws DBException {
try {
Iterator<BatchWriter> iterator = writers.values().iterator();
while (iterator.hasNext()) {
BatchWriter writer = iterator.next();
writer.close();
iterator.remove();
}
} catch (MutationsRejectedException e) {
throw new DBException(e);
}
}
/**
* Called when the user specifies a table that isn't the same as the existing
* table. Connect to it and if necessary, close our current connection.
*
* @param table
* The table to open.
*/
public BatchWriter getWriter(String table) throws TableNotFoundException {
// tl;dr We're paying a cost for the ConcurrentHashMap here to deal with the DB api.
// We know that YCSB is really only ever going to send us data for one table, so using
// a concurrent data structure is overkill (especially in such a hot code path).
// However, the impact seems to be relatively negligible in trivial local tests and it's
// "more correct" WRT to the API.
BatchWriter writer = writers.get(table);
if (null == writer) {
BatchWriter newWriter = createBatchWriter(table);
BatchWriter oldWriter = writers.putIfAbsent(table, newWriter);
// Someone beat us to creating a BatchWriter for this table, use their BatchWriters
if (null != oldWriter) {
try {
// Make sure to clean up our new batchwriter!
newWriter.close();
} catch (MutationsRejectedException e) {
throw new RuntimeException(e);
}
writer = oldWriter;
} else {
writer = newWriter;
}
}
return writer;
}
/**
* Creates a BatchWriter with the expected configuration.
*
* @param table The table to write to
*/
private BatchWriter createBatchWriter(String table) throws TableNotFoundException {
BatchWriterConfig bwc = new BatchWriterConfig();
bwc.setMaxLatency(
Long.parseLong(getProperties()
.getProperty("accumulo.batchWriterMaxLatency", "30000")),
TimeUnit.MILLISECONDS);
bwc.setMaxMemory(Long.parseLong(
getProperties().getProperty("accumulo.batchWriterSize", "100000")));
final String numThreadsValue = getProperties().getProperty("accumulo.batchWriterThreads");
// Try to saturate the client machine.
int numThreads = Math.max(1, Runtime.getRuntime().availableProcessors() / 2);
if (null != numThreadsValue) {
numThreads = Integer.parseInt(numThreadsValue);
}
System.err.println("Using " + numThreads + " threads to write data");
bwc.setMaxWriteThreads(numThreads);
return connector.createBatchWriter(table, bwc);
}
/**
* Gets a scanner from Accumulo over one row.
*
* @param row the row to scan
* @param fields the set of columns to scan
* @return an Accumulo {@link Scanner} bound to the given row and columns
*/
private Scanner getRow(String table, Text row, Set<String> fields) throws TableNotFoundException {
Scanner scanner = connector.createScanner(table, Authorizations.EMPTY);
scanner.setRange(new Range(row));
if (fields != null) {
for (String field : fields) {
scanner.fetchColumn(colFam, new Text(field));
}
}
return scanner;
}
@Override
public Status read(String table, String key, Set<String> fields,
Map<String, ByteIterator> result) {
Scanner scanner = null;
try {
scanner = getRow(table, new Text(key), null);
// Pick out the results we care about.
final Text cq = new Text();
for (Entry<Key, Value> entry : scanner) {
entry.getKey().getColumnQualifier(cq);
Value v = entry.getValue();
byte[] buf = v.get();
result.put(cq.toString(),
new ByteArrayByteIterator(buf));
}
} catch (Exception e) {
System.err.println("Error trying to reading Accumulo table " + table + " " + key);
e.printStackTrace();
return Status.ERROR;
} finally {
if (null != scanner) {
scanner.close();
}
}
return Status.OK;
}
@Override
public Status scan(String table, String startkey, int recordcount,
Set<String> fields, Vector<HashMap<String, ByteIterator>> result) {
// Just make the end 'infinity' and only read as much as we need.
Scanner scanner = null;
try {
scanner = connector.createScanner(table, Authorizations.EMPTY);
scanner.setRange(new Range(new Text(startkey), null));
// Have Accumulo send us complete rows, serialized in a single Key-Value pair
IteratorSetting cfg = new IteratorSetting(100, WholeRowIterator.class);
scanner.addScanIterator(cfg);
// If no fields are provided, we assume one column/row.
if (fields != null) {
// And add each of them as fields we want.
for (String field : fields) {
scanner.fetchColumn(colFam, new Text(field));
}
}
int count = 0;
for (Entry<Key, Value> entry : scanner) {
// Deserialize the row
SortedMap<Key, Value> row = WholeRowIterator.decodeRow(entry.getKey(), entry.getValue());
HashMap<String, ByteIterator> rowData;
if (null != fields) {
rowData = new HashMap<>(fields.size());
} else {
rowData = new HashMap<>();
}
result.add(rowData);
// Parse the data in the row, avoid unnecessary Text object creation
final Text cq = new Text();
for (Entry<Key, Value> rowEntry : row.entrySet()) {
rowEntry.getKey().getColumnQualifier(cq);
rowData.put(cq.toString(), new ByteArrayByteIterator(rowEntry.getValue().get()));
}
if (count++ == recordcount) { // Done reading the last row.
break;
}
}
} catch (TableNotFoundException e) {
System.err.println("Error trying to connect to Accumulo table.");
e.printStackTrace();
return Status.ERROR;
} catch (IOException e) {
System.err.println("Error deserializing data from Accumulo.");
e.printStackTrace();
return Status.ERROR;
} finally {
if (null != scanner) {
scanner.close();
}
}
return Status.OK;
}
@Override
public Status update(String table, String key,
Map<String, ByteIterator> values) {
BatchWriter bw = null;
try {
bw = getWriter(table);
} catch (TableNotFoundException e) {
System.err.println("Error opening batch writer to Accumulo table " + table);
e.printStackTrace();
return Status.ERROR;
}
Mutation mutInsert = new Mutation(key.getBytes(UTF_8));
for (Map.Entry<String, ByteIterator> entry : values.entrySet()) {
mutInsert.put(colFamBytes, entry.getKey().getBytes(UTF_8), entry.getValue().toArray());
}
try {
bw.addMutation(mutInsert);
} catch (MutationsRejectedException e) {
System.err.println("Error performing update.");
e.printStackTrace();
return Status.ERROR;
}
return Status.BATCHED_OK;
}
@Override
public Status insert(String t, String key,
Map<String, ByteIterator> values) {
return update(t, key, values);
}
@Override
public Status delete(String table, String key) {
BatchWriter bw;
try {
bw = getWriter(table);
} catch (TableNotFoundException e) {
System.err.println("Error trying to connect to Accumulo table.");
e.printStackTrace();
return Status.ERROR;
}
try {
deleteRow(table, new Text(key), bw);
} catch (TableNotFoundException | MutationsRejectedException e) {
System.err.println("Error performing delete.");
e.printStackTrace();
return Status.ERROR;
} catch (RuntimeException e) {
System.err.println("Error performing delete.");
e.printStackTrace();
return Status.ERROR;
}
return Status.OK;
}
// These functions are adapted from RowOperations.java:
private void deleteRow(String table, Text row, BatchWriter bw) throws MutationsRejectedException,
TableNotFoundException {
// TODO Use a batchDeleter instead
deleteRow(getRow(table, row, null), bw);
}
/**
* Deletes a row, given a Scanner of JUST that row.
*/
private void deleteRow(Scanner scanner, BatchWriter bw) throws MutationsRejectedException {
Mutation deleter = null;
// iterate through the keys
final Text row = new Text();
final Text cf = new Text();
final Text cq = new Text();
for (Entry<Key, Value> entry : scanner) {
// create a mutation for the row
if (deleter == null) {
entry.getKey().getRow(row);
deleter = new Mutation(row);
}
entry.getKey().getColumnFamily(cf);
entry.getKey().getColumnQualifier(cq);
// the remove function adds the key with the delete flag set to true
deleter.putDelete(cf, cq);
}
bw.addMutation(deleter);
}
}
/**
* Copyright (c) 2015 YCSB contributors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
/**
* YCSB binding for <a href="https://accumulo.apache.org/">Apache Accumulo</a>.
*/
package com.yahoo.ycsb.db.accumulo;
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment