Skip to content
Snippets Groups Projects
Commit ddde8e3c authored by Josh Elser's avatar Josh Elser Committed by Kevin Risden
Browse files

[accumulo] A general "refresh" to the Accumulo binding (#947)

* Expand on the README, covering table creation and best-practices for table config
* Avoid unnecessary Text object creations (in loops and instead of byte[] usage)
* Use a ConcurrentHashMap to better match the DB API
* Fix error messages and always call printStackTrace() on exceptions
* Use BATCHED_OK instead of OK in insert() (more correct)
parent 78c3cfae
No related branches found
No related tags found
No related merge requests found
......@@ -36,7 +36,42 @@ Git clone YCSB and compile:
cd YCSB
mvn -pl com.yahoo.ycsb:aerospike-binding -am clean package
### 3. Load Data and Run Tests
### 3. Create the Accumulo table
By default, YCSB uses a table with the name "usertable". Users must create this table before loading
data into Accumulo. For maximum Accumulo performance, the Accumulo table must be pre-split. A simple
Ruby script, based on the HBase README, can generate adequate split-point. 10's of Tablets per
TabletServer is a good starting point. Unless otherwise specified, the following commands should run
on any version of Accumulo.
$ echo 'num_splits = 20; puts (1..num_splits).map {|i| "user#{1000+i*(9999-1000)/num_splits}"}' | ruby > /tmp/splits.txt
$ accumulo shell -u <user> -p <password> -e "createtable usertable"
$ accumulo shell -u <user> -p <password> -e "addsplits -t usertable -sf /tmp/splits.txt"
$ accumulo shell -u <user> -p <password> -e "config -t usertable -s table.cache.block.enable=true"
Additionally, there are some other configuration properties which can increase performance. These
can be set on the Accumulo table via the shell after it is created. Setting the table durability
to `flush` relaxes the constraints on data durability during hard power-outages (avoids calls
to fsync). Accumulo defaults table compression to `gzip` which is not particularly fast; `snappy`
is a faster and similarly-efficient option. The mutation queue property controls how many writes
that Accumulo will buffer in memory before performing a flush; this property should be set relative
to the amount of JVM heap the TabletServers are given.
Please note that the `table.durability` and `tserver.total.mutation.queue.max` properties only
exists for >=Accumulo-1.7. There are no concise replacements for these properties in earlier versions.
accumulo> config -s table.durability=flush
accumulo> config -s tserver.total.mutation.queue.max=256M
accumulo> config -t usertable -s table.file.compress.type=snappy
On repeated data loads, the following commands may be helpful to re-set the state of the table quickly.
accumulo> createtable tmp --copy-splits usertable --copy-config usertable
accumulo> deletetable --force usertable
accumulo> renametable tmp usertable
accumulo> compact --wait -t accumulo.metadata
### 4. Load Data and Run Tests
Load the data:
......
......@@ -18,17 +18,25 @@
package com.yahoo.ycsb.db.accumulo;
import com.yahoo.ycsb.ByteArrayByteIterator;
import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.DB;
import com.yahoo.ycsb.DBException;
import com.yahoo.ycsb.Status;
import static java.nio.charset.StandardCharsets.UTF_8;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedMap;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
......@@ -39,16 +47,16 @@ import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.user.WholeRowIterator;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.CleanUp;
import org.apache.hadoop.io.Text;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.Vector;
import java.util.concurrent.TimeUnit;
import com.yahoo.ycsb.ByteArrayByteIterator;
import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.DB;
import com.yahoo.ycsb.DBException;
import com.yahoo.ycsb.Status;
/**
* <a href="https://accumulo.apache.org/">Accumulo</a> binding for YCSB.
......@@ -57,14 +65,11 @@ public class AccumuloClient extends DB {
private ZooKeeperInstance inst;
private Connector connector;
private String table = "";
private BatchWriter bw = null;
private Text colFam = new Text("");
private Scanner singleScanner = null; // A scanner for reads/deletes.
private Scanner scanScanner = null; // A scanner for use by scan()
private byte[] colFamBytes = new byte[0];
private final ConcurrentHashMap<String, BatchWriter> writers = new ConcurrentHashMap<>();
static {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
......@@ -76,6 +81,7 @@ public class AccumuloClient extends DB {
@Override
public void init() throws DBException {
colFam = new Text(getProperties().getProperty("accumulo.columnFamily"));
colFamBytes = colFam.toString().getBytes(UTF_8);
inst = new ZooKeeperInstance(
getProperties().getProperty("accumulo.instanceName"),
......@@ -85,9 +91,7 @@ public class AccumuloClient extends DB {
AuthenticationToken token =
new PasswordToken(getProperties().getProperty("accumulo.password"));
connector = inst.getConnector(principal, token);
} catch (AccumuloException e) {
throw new DBException(e);
} catch (AccumuloSecurityException e) {
} catch (AccumuloException | AccumuloSecurityException e) {
throw new DBException(e);
}
......@@ -100,45 +104,56 @@ public class AccumuloClient extends DB {
@Override
public void cleanup() throws DBException {
try {
if (bw != null) {
bw.close();
Iterator<BatchWriter> iterator = writers.values().iterator();
while (iterator.hasNext()) {
BatchWriter writer = iterator.next();
writer.close();
iterator.remove();
}
} catch (MutationsRejectedException e) {
throw new DBException(e);
}
}
/**
* Commonly repeated functionality: Before doing any operation, make sure
* we're working on the correct table. If not, open the correct one.
*
* @param t
* The table to open.
*/
public void checkTable(String t) throws TableNotFoundException {
if (!table.equals(t)) {
getTable(t);
}
}
/**
* Called when the user specifies a table that isn't the same as the existing
* table. Connect to it and if necessary, close our current connection.
*
* @param t
* @param table
* The table to open.
*/
public void getTable(String t) throws TableNotFoundException {
if (bw != null) { // Close the existing writer if necessary.
try {
bw.close();
} catch (MutationsRejectedException e) {
// Couldn't spit out the mutations we wanted.
// Ignore this for now.
System.err.println("MutationsRejectedException: " + e.getMessage());
public BatchWriter getWriter(String table) throws TableNotFoundException {
// tl;dr We're paying a cost for the ConcurrentHashMap here to deal with the DB api.
// We know that YCSB is really only ever going to send us data for one table, so using
// a concurrent data structure is overkill (especially in such a hot code path).
// However, the impact seems to be relatively negligible in trivial local tests and it's
// "more correct" WRT to the API.
BatchWriter writer = writers.get(table);
if (null == writer) {
BatchWriter newWriter = createBatchWriter(table);
BatchWriter oldWriter = writers.putIfAbsent(table, newWriter);
// Someone beat us to creating a BatchWriter for this table, use their BatchWriters
if (null != oldWriter) {
try {
// Make sure to clean up our new batchwriter!
newWriter.close();
} catch (MutationsRejectedException e) {
throw new RuntimeException(e);
}
writer = oldWriter;
} else {
writer = newWriter;
}
}
return writer;
}
/**
* Creates a BatchWriter with the expected configuration.
*
* @param table The table to write to
*/
private BatchWriter createBatchWriter(String table) throws TableNotFoundException {
BatchWriterConfig bwc = new BatchWriterConfig();
bwc.setMaxLatency(
Long.parseLong(getProperties()
......@@ -146,16 +161,15 @@ public class AccumuloClient extends DB {
TimeUnit.MILLISECONDS);
bwc.setMaxMemory(Long.parseLong(
getProperties().getProperty("accumulo.batchWriterSize", "100000")));
bwc.setMaxWriteThreads(Integer.parseInt(
getProperties().getProperty("accumulo.batchWriterThreads", "1")));
bw = connector.createBatchWriter(t, bwc);
// Create our scanners
singleScanner = connector.createScanner(t, Authorizations.EMPTY);
scanScanner = connector.createScanner(t, Authorizations.EMPTY);
table = t; // Store the name of the table we have open.
final String numThreadsValue = getProperties().getProperty("accumulo.batchWriterThreads");
// Try to saturate the client machine.
int numThreads = Math.max(1, Runtime.getRuntime().availableProcessors() / 2);
if (null != numThreadsValue) {
numThreads = Integer.parseInt(numThreadsValue);
}
System.err.println("Using " + numThreads + " threads to write data");
bwc.setMaxWriteThreads(numThreads);
return connector.createBatchWriter(table, bwc);
}
/**
......@@ -165,120 +179,120 @@ public class AccumuloClient extends DB {
* @param fields the set of columns to scan
* @return an Accumulo {@link Scanner} bound to the given row and columns
*/
private Scanner getRow(Text row, Set<String> fields) {
singleScanner.clearColumns();
singleScanner.setRange(new Range(row));
private Scanner getRow(String table, Text row, Set<String> fields) throws TableNotFoundException {
Scanner scanner = connector.createScanner(table, Authorizations.EMPTY);
scanner.setRange(new Range(row));
if (fields != null) {
for (String field : fields) {
singleScanner.fetchColumn(colFam, new Text(field));
scanner.fetchColumn(colFam, new Text(field));
}
}
return singleScanner;
return scanner;
}
@Override
public Status read(String t, String key, Set<String> fields,
public Status read(String table, String key, Set<String> fields,
HashMap<String, ByteIterator> result) {
Scanner scanner = null;
try {
checkTable(t);
} catch (TableNotFoundException e) {
System.err.println("Error trying to connect to Accumulo table." + e);
return Status.ERROR;
}
try {
scanner = getRow(table, new Text(key), null);
// Pick out the results we care about.
for (Entry<Key, Value> entry : getRow(new Text(key), null)) {
final Text cq = new Text();
for (Entry<Key, Value> entry : scanner) {
entry.getKey().getColumnQualifier(cq);
Value v = entry.getValue();
byte[] buf = v.get();
result.put(entry.getKey().getColumnQualifier().toString(),
result.put(cq.toString(),
new ByteArrayByteIterator(buf));
}
} catch (Exception e) {
System.err.println("Error trying to reading Accumulo table" + key + e);
System.err.println("Error trying to reading Accumulo table " + table + " " + key);
e.printStackTrace();
return Status.ERROR;
} finally {
if (null != scanner) {
scanner.close();
}
}
return Status.OK;
}
@Override
public Status scan(String t, String startkey, int recordcount,
public Status scan(String table, String startkey, int recordcount,
Set<String> fields, Vector<HashMap<String, ByteIterator>> result) {
try {
checkTable(t);
} catch (TableNotFoundException e) {
System.err.println("Error trying to connect to Accumulo table." + e);
return Status.ERROR;
}
// There doesn't appear to be a way to create a range for a given
// LENGTH. Just start and end keys. So we'll do this the hard way for
// now:
// Just make the end 'infinity' and only read as much as we need.
scanScanner.clearColumns();
scanScanner.setRange(new Range(new Text(startkey), null));
// Batch size is how many key/values to try to get per call. Here, I'm
// guessing that the number of keys in a row is equal to the number of
// fields we're interested in.
// We try to fetch one more so as to tell when we've run out of fields.
// If no fields are provided, we assume one column/row.
if (fields != null) {
// And add each of them as fields we want.
for (String field : fields) {
scanScanner.fetchColumn(colFam, new Text(field));
Scanner scanner = null;
try {
scanner = connector.createScanner(table, Authorizations.EMPTY);
scanner.setRange(new Range(new Text(startkey), null));
// Have Accumulo send us complete rows, serialized in a single Key-Value pair
IteratorSetting cfg = new IteratorSetting(100, WholeRowIterator.class);
scanner.addScanIterator(cfg);
// If no fields are provided, we assume one column/row.
if (fields != null) {
// And add each of them as fields we want.
for (String field : fields) {
scanner.fetchColumn(colFam, new Text(field));
}
}
}
String rowKey = "";
HashMap<String, ByteIterator> currentHM = null;
int count = 0;
// Begin the iteration.
for (Entry<Key, Value> entry : scanScanner) {
// Check for a new row.
if (!rowKey.equals(entry.getKey().getRow().toString())) {
int count = 0;
for (Entry<Key, Value> entry : scanner) {
// Deserialize the row
SortedMap<Key, Value> row = WholeRowIterator.decodeRow(entry.getKey(), entry.getValue());
HashMap<String, ByteIterator> rowData;
if (null != fields) {
rowData = new HashMap<>(fields.size());
} else {
rowData = new HashMap<>();
}
result.add(rowData);
// Parse the data in the row, avoid unnecessary Text object creation
final Text cq = new Text();
for (Entry<Key, Value> rowEntry : row.entrySet()) {
rowEntry.getKey().getColumnQualifier(cq);
rowData.put(cq.toString(), new ByteArrayByteIterator(rowEntry.getValue().get()));
}
if (count++ == recordcount) { // Done reading the last row.
break;
}
rowKey = entry.getKey().getRow().toString();
if (fields != null) {
// Initial Capacity for all keys.
currentHM = new HashMap<String, ByteIterator>(fields.size());
} else {
// An empty result map.
currentHM = new HashMap<String, ByteIterator>();
}
result.add(currentHM);
}
// Now add the key to the hashmap.
Value v = entry.getValue();
byte[] buf = v.get();
currentHM.put(entry.getKey().getColumnQualifier().toString(),
new ByteArrayByteIterator(buf));
} catch (TableNotFoundException e) {
System.err.println("Error trying to connect to Accumulo table.");
e.printStackTrace();
return Status.ERROR;
} catch (IOException e) {
System.err.println("Error deserializing data from Accumulo.");
e.printStackTrace();
return Status.ERROR;
} finally {
if (null != scanner) {
scanner.close();
}
}
return Status.OK;
}
@Override
public Status update(String t, String key,
public Status update(String table, String key,
HashMap<String, ByteIterator> values) {
BatchWriter bw = null;
try {
checkTable(t);
bw = getWriter(table);
} catch (TableNotFoundException e) {
System.err.println("Error trying to connect to Accumulo table." + e);
System.err.println("Error opening batch writer to Accumulo table " + table);
e.printStackTrace();
return Status.ERROR;
}
Mutation mutInsert = new Mutation(new Text(key));
Mutation mutInsert = new Mutation(key.getBytes(UTF_8));
for (Map.Entry<String, ByteIterator> entry : values.entrySet()) {
mutInsert.put(colFam, new Text(entry.getKey()),
System.currentTimeMillis(), new Value(entry.getValue().toArray()));
mutInsert.put(colFamBytes, entry.getKey().getBytes(UTF_8), entry.getValue().toArray());
}
try {
......@@ -289,7 +303,7 @@ public class AccumuloClient extends DB {
return Status.ERROR;
}
return Status.OK;
return Status.BATCHED_OK;
}
@Override
......@@ -299,17 +313,19 @@ public class AccumuloClient extends DB {
}
@Override
public Status delete(String t, String key) {
public Status delete(String table, String key) {
BatchWriter bw;
try {
checkTable(t);
bw = getWriter(table);
} catch (TableNotFoundException e) {
System.err.println("Error trying to connect to Accumulo table." + e);
System.err.println("Error trying to connect to Accumulo table.");
e.printStackTrace();
return Status.ERROR;
}
try {
deleteRow(new Text(key));
} catch (MutationsRejectedException e) {
deleteRow(table, new Text(key), bw);
} catch (TableNotFoundException | MutationsRejectedException e) {
System.err.println("Error performing delete.");
e.printStackTrace();
return Status.ERROR;
......@@ -323,24 +339,31 @@ public class AccumuloClient extends DB {
}
// These functions are adapted from RowOperations.java:
private void deleteRow(Text row) throws MutationsRejectedException {
deleteRow(getRow(row, null));
private void deleteRow(String table, Text row, BatchWriter bw) throws MutationsRejectedException,
TableNotFoundException {
// TODO Use a batchDeleter instead
deleteRow(getRow(table, row, null), bw);
}
/**
* Deletes a row, given a Scanner of JUST that row.
*/
private void deleteRow(Scanner scanner) throws MutationsRejectedException {
private void deleteRow(Scanner scanner, BatchWriter bw) throws MutationsRejectedException {
Mutation deleter = null;
// iterate through the keys
final Text row = new Text();
final Text cf = new Text();
final Text cq = new Text();
for (Entry<Key, Value> entry : scanner) {
// create a mutation for the row
if (deleter == null) {
deleter = new Mutation(entry.getKey().getRow());
entry.getKey().getRow(row);
deleter = new Mutation(row);
}
entry.getKey().getColumnFamily(cf);
entry.getKey().getColumnQualifier(cq);
// the remove function adds the key with the delete flag set to true
deleter.putDelete(entry.getKey().getColumnFamily(),
entry.getKey().getColumnQualifier());
deleter.putDelete(cf, cq);
}
bw.addMutation(deleter);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment