Skip to content
Snippets Groups Projects
Commit e995c1e3 authored by allanbank's avatar allanbank
Browse files

Merge pull request #464 from allanbank/accumulo-cleanup

[accumulo] Checkstyle cleanup and enforcement for the Accumulo binding.
parents 19e3d270 f3160f6b
No related branches found
No related tags found
No related merge requests found
......@@ -64,4 +64,28 @@ LICENSE file.
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>2.15</version>
<configuration>
<consoleOutput>true</consoleOutput>
<configLocation>../checkstyle.xml</configLocation>
<failOnViolation>true</failOnViolation>
<failsOnError>true</failsOnError>
</configuration>
<executions>
<execution>
<id>validate</id>
<phase>validate</phase>
<goals>
<goal>checkstyle</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
......@@ -54,400 +54,403 @@ import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.DB;
import com.yahoo.ycsb.DBException;
/**
* <a href="https://accumulo.apache.org/">Accumulo</a> binding for YCSB.
*/
public class AccumuloClient extends DB {
// Error code constants.
public static final int Ok = 0;
public static final int ServerError = -1;
public static final int HttpError = -2;
public static final int NoMatchingRecord = -3;
private ZooKeeperInstance _inst;
private Connector _connector;
private String _table = "";
private BatchWriter _bw = null;
private Text _colFam = new Text("");
private Scanner _singleScanner = null; // A scanner for reads/deletes.
private Scanner _scanScanner = null; // A scanner for use by scan()
private static final String PC_PRODUCER = "producer";
private static final String PC_CONSUMER = "consumer";
private String _PC_FLAG = "";
private ZKProducerConsumer.Queue q = null;
private static Hashtable<String,Long> hmKeyReads = null;
private static Hashtable<String,Integer> hmKeyNumReads = null;
private Random r = null;
@Override
public void init() throws DBException {
_colFam = new Text(getProperties().getProperty("accumulo.columnFamily"));
_inst = new ZooKeeperInstance(getProperties().getProperty("accumulo.instanceName"),
getProperties().getProperty("accumulo.zooKeepers"));
try {
String principal = getProperties().getProperty("accumulo.username");
AuthenticationToken token = new PasswordToken(getProperties().getProperty("accumulo.password"));
_connector = _inst.getConnector(principal, token);
} catch (AccumuloException e) {
throw new DBException(e);
} catch (AccumuloSecurityException e) {
throw new DBException(e);
}
_PC_FLAG = getProperties().getProperty("accumulo.PC_FLAG","none");
if (_PC_FLAG.equals(PC_PRODUCER) || _PC_FLAG.equals(PC_CONSUMER)) {
System.out.println("*** YCSB Client is "+_PC_FLAG);
String address = getProperties().getProperty("accumulo.PC_SERVER");
String root = getProperties().getProperty("accumulo.PC_ROOT_IN_ZK");
System.out.println("*** PC_INFO(server:"+address+";root="+root+")");
q = new ZKProducerConsumer.Queue(address, root);
r = new Random();
}
if (_PC_FLAG.equals(PC_CONSUMER)) {
hmKeyReads = new Hashtable<String,Long>();
hmKeyNumReads = new Hashtable<String,Integer>();
keyNotification(null);
}
}
@Override
public void cleanup() throws DBException
{
try {
if (_bw != null) {
_bw.close();
}
} catch (MutationsRejectedException e) {
throw new DBException(e);
}
CleanUp.shutdownNow();
}
/**
* Commonly repeated functionality: Before doing any operation, make sure
* we're working on the correct table. If not, open the correct one.
*
* @param table
*/
public void checkTable(String table) throws TableNotFoundException {
if (!_table.equals(table)) {
getTable(table);
}
}
/**
* Called when the user specifies a table that isn't the same as the
* existing table. Connect to it and if necessary, close our current
* connection.
*
* @param table
*/
public void getTable(String table) throws TableNotFoundException {
if (_bw != null) { // Close the existing writer if necessary.
try {
_bw.close();
} catch (MutationsRejectedException e) {
// Couldn't spit out the mutations we wanted.
// Ignore this for now.
}
}
BatchWriterConfig bwc = new BatchWriterConfig();
bwc.setMaxLatency(Long.parseLong(getProperties().getProperty("accumulo.batchWriterMaxLatency", "30000")), TimeUnit.MILLISECONDS);
bwc.setMaxMemory(Long.parseLong(getProperties().getProperty("accumulo.batchWriterSize", "100000")));
bwc.setMaxWriteThreads(Integer.parseInt(getProperties().getProperty("accumulo.batchWriterThreads", "1")));
_bw = _connector.createBatchWriter(table, bwc);
// Create our scanners
_singleScanner = _connector.createScanner(table, Authorizations.EMPTY);
_scanScanner = _connector.createScanner(table, Authorizations.EMPTY);
_table = table; // Store the name of the table we have open.
}
/**
* Gets a scanner from Accumulo over one row
*
* @param row the row to scan
* @param fields the set of columns to scan
* @return an Accumulo {@link Scanner} bound to the given row and columns
*/
private Scanner getRow(Text row, Set<String> fields)
{
_singleScanner.clearColumns();
_singleScanner.setRange(new Range(row));
if (fields != null) {
for(String field:fields)
{
_singleScanner.fetchColumn(_colFam, new Text(field));
}
}
return _singleScanner;
}
@Override
public int read(String table, String key, Set<String> fields,
HashMap<String, ByteIterator> result) {
try {
checkTable(table);
} catch (TableNotFoundException e) {
System.err.println("Error trying to connect to Accumulo table." + e);
return ServerError;
}
try {
// Pick out the results we care about.
for (Entry<Key, Value> entry : getRow(new Text(key), null)) {
Value v = entry.getValue();
byte[] buf = v.get();
result.put(entry.getKey().getColumnQualifier().toString(),
new ByteArrayByteIterator(buf));
}
} catch (Exception e) {
System.err.println("Error trying to reading Accumulo table" + key + e);
return ServerError;
}
return Ok;
}
@Override
public int scan(String table, String startkey, int recordcount,
Set<String> fields, Vector<HashMap<String, ByteIterator>> result) {
try {
checkTable(table);
} catch (TableNotFoundException e) {
System.err.println("Error trying to connect to Accumulo table." + e);
return ServerError;
}
// There doesn't appear to be a way to create a range for a given
// LENGTH. Just start and end keys. So we'll do this the hard way for now:
// Just make the end 'infinity' and only read as much as we need.
_scanScanner.clearColumns();
_scanScanner.setRange(new Range(new Text(startkey), null));
// Batch size is how many key/values to try to get per call. Here, I'm
// guessing that the number of keys in a row is equal to the number of fields
// we're interested in.
// We try to fetch one more so as to tell when we've run out of fields.
if (fields != null) {
// And add each of them as fields we want.
for(String field:fields)
{
_scanScanner.fetchColumn(_colFam, new Text(field));
}
} else {
// If no fields are provided, we assume one column/row.
}
String rowKey = "";
HashMap<String, ByteIterator> currentHM = null;
int count = 0;
// Begin the iteration.
for (Entry<Key, Value> entry : _scanScanner) {
// Check for a new row.
if (!rowKey.equals(entry.getKey().getRow().toString())) {
if (count++ == recordcount) { // Done reading the last row.
break;
}
rowKey = entry.getKey().getRow().toString();
if (fields != null) {
// Initial Capacity for all keys.
currentHM = new HashMap<String, ByteIterator>(fields.size());
}
else
{
// An empty result map.
currentHM = new HashMap<String, ByteIterator>();
}
result.add(currentHM);
}
// Now add the key to the hashmap.
Value v = entry.getValue();
byte[] buf = v.get();
currentHM.put(entry.getKey().getColumnQualifier().toString(), new ByteArrayByteIterator(buf));
}
return Ok;
}
@Override
public int update(String table, String key, HashMap<String, ByteIterator> values) {
try {
checkTable(table);
} catch (TableNotFoundException e) {
System.err.println("Error trying to connect to Accumulo table." + e);
return ServerError;
}
Mutation mutInsert = new Mutation(new Text(key));
for (Map.Entry<String, ByteIterator> entry : values.entrySet()) {
mutInsert.put(_colFam, new Text(entry.getKey()), System
.currentTimeMillis(),
new Value(entry.getValue().toArray()));
}
try {
_bw.addMutation(mutInsert);
// Distributed YCSB co-ordination: YCSB on a client produces the key to
// be stored in the shared queue in ZooKeeper.
if (_PC_FLAG.equals(PC_PRODUCER)) {
if (r.nextFloat() < 0.01)
keyNotification(key);
}
} catch (MutationsRejectedException e) {
System.err.println("Error performing update.");
e.printStackTrace();
return ServerError;
}
return Ok;
}
@Override
public int insert(String table, String key, HashMap<String, ByteIterator> values) {
return update(table, key, values);
}
@Override
public int delete(String table, String key) {
try {
checkTable(table);
} catch (TableNotFoundException e) {
System.err.println("Error trying to connect to Accumulo table." + e);
return ServerError;
}
try {
deleteRow(new Text(key));
} catch (RuntimeException e) {
System.err.println("Error performing delete.");
e.printStackTrace();
return ServerError;
}
return Ok;
}
// These functions are adapted from RowOperations.java:
private void deleteRow(Text row) {
deleteRow(getRow(row, null));
}
/**
* Deletes a row, given a Scanner of JUST that row
*
*/
private void deleteRow(Scanner scanner) {
Mutation deleter = null;
// iterate through the keys
for (Entry<Key,Value> entry : scanner) {
// create a mutation for the row
if (deleter == null)
deleter = new Mutation(entry.getKey().getRow());
// the remove function adds the key with the delete flag set to true
deleter.putDelete(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier());
}
try {
_bw.addMutation(deleter);
} catch (MutationsRejectedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private void keyNotification(String key) {
if (_PC_FLAG.equals(PC_PRODUCER)) {
try {
q.produce(key);
} catch (KeeperException e) {
} catch (InterruptedException e) {
}
} else {
//XXX: do something better to keep the loop going (while??)
for (int i = 0; i < 10000000; i++) {
try {
String strKey = q.consume();
if ((hmKeyReads.containsKey(strKey) == false) &&
(hmKeyNumReads.containsKey(strKey) == false)) {
hmKeyReads.put(strKey, new Long(System.currentTimeMillis()));
hmKeyNumReads.put(strKey, new Integer(1));
}
//YCSB Consumer will read the key that was fetched from the
//queue in ZooKeeper.
//(current way is kind of ugly but works, i think)
//TODO : Get table name from configuration or argument
String table = "usertable";
HashSet<String> fields = new HashSet<String>();
for (int j=0; j<9; j++)
fields.add("field"+j);
HashMap<String,ByteIterator> result = new HashMap<String,ByteIterator>();
int retval = read(table, strKey, fields, result);
//If the results are empty, the key is enqueued in Zookeeper
//and tried again, until the results are found.
if (result.size() == 0) {
q.produce(strKey);
int count = ((Integer)hmKeyNumReads.get(strKey)).intValue();
hmKeyNumReads.put(strKey, new Integer(count+1));
}
else {
if (((Integer)hmKeyNumReads.get(strKey)).intValue() > 1) {
long currTime = System.currentTimeMillis();
long writeTime = ((Long)hmKeyReads.get(strKey)).longValue();
System.out.println("Key="+strKey+
//";StartSearch="+writeTime+
//";EndSearch="+currTime+
";TimeLag="+(currTime-writeTime));
}
}
} catch (KeeperException e) {
} catch (InterruptedException e) {
}
}
}
}
public int presplit(String table, String[] keys)
{
TreeSet<Text> splits = new TreeSet<Text>();
for (int i = 0;i < keys.length; i ++)
{
splits.add(new Text(keys[i]));
}
try {
_connector.tableOperations().addSplits(table, splits);
} catch (TableNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (AccumuloException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (AccumuloSecurityException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return Ok;
}
// Error code constants.
public static final int OK = 0;
public static final int SERVER_ERROR = -1;
public static final int HTTP_ERROR = -2;
public static final int NO_MATCHING_RECORD = -3;
private ZooKeeperInstance inst;
private Connector connector;
private String table = "";
private BatchWriter bw = null;
private Text colFam = new Text("");
private Scanner singleScanner = null; // A scanner for reads/deletes.
private Scanner scanScanner = null; // A scanner for use by scan()
private static final String PC_PRODUCER = "producer";
private static final String PC_CONSUMER = "consumer";
private String pcFlag = "";
private ZKProducerConsumer.Queue q = null;
private static Hashtable<String, Long> hmKeyReads = null;
private static Hashtable<String, Integer> hmKeyNumReads = null;
private Random r = null;
@Override
public void init() throws DBException {
colFam = new Text(getProperties().getProperty("accumulo.columnFamily"));
inst = new ZooKeeperInstance(
getProperties().getProperty("accumulo.instanceName"),
getProperties().getProperty("accumulo.zooKeepers"));
try {
String principal = getProperties().getProperty("accumulo.username");
AuthenticationToken token =
new PasswordToken(getProperties().getProperty("accumulo.password"));
connector = inst.getConnector(principal, token);
} catch (AccumuloException e) {
throw new DBException(e);
} catch (AccumuloSecurityException e) {
throw new DBException(e);
}
pcFlag = getProperties().getProperty("accumulo.PC_FLAG", "none");
if (pcFlag.equals(PC_PRODUCER) || pcFlag.equals(PC_CONSUMER)) {
System.out.println("*** YCSB Client is " + pcFlag);
String address = getProperties().getProperty("accumulo.PC_SERVER");
String root = getProperties().getProperty("accumulo.PC_ROOT_IN_ZK");
System.out
.println("*** PC_INFO(server:" + address + ";root=" + root + ")");
q = new ZKProducerConsumer.Queue(address, root);
r = new Random();
}
if (pcFlag.equals(PC_CONSUMER)) {
hmKeyReads = new Hashtable<String, Long>();
hmKeyNumReads = new Hashtable<String, Integer>();
try {
keyNotification(null);
} catch (KeeperException e) {
throw new DBException(e);
}
}
}
@Override
public void cleanup() throws DBException {
try {
if (bw != null) {
bw.close();
}
} catch (MutationsRejectedException e) {
throw new DBException(e);
}
CleanUp.shutdownNow();
}
/**
* Commonly repeated functionality: Before doing any operation, make sure
* we're working on the correct table. If not, open the correct one.
*
* @param t
* The table to open.
*/
public void checkTable(String t) throws TableNotFoundException {
if (!table.equals(t)) {
getTable(t);
}
}
/**
* Called when the user specifies a table that isn't the same as the existing
* table. Connect to it and if necessary, close our current connection.
*
* @param t
* The table to open.
*/
public void getTable(String t) throws TableNotFoundException {
if (bw != null) { // Close the existing writer if necessary.
try {
bw.close();
} catch (MutationsRejectedException e) {
// Couldn't spit out the mutations we wanted.
// Ignore this for now.
System.err.println("MutationsRejectedException: " + e.getMessage());
}
}
BatchWriterConfig bwc = new BatchWriterConfig();
bwc.setMaxLatency(
Long.parseLong(getProperties()
.getProperty("accumulo.batchWriterMaxLatency", "30000")),
TimeUnit.MILLISECONDS);
bwc.setMaxMemory(Long.parseLong(
getProperties().getProperty("accumulo.batchWriterSize", "100000")));
bwc.setMaxWriteThreads(Integer.parseInt(
getProperties().getProperty("accumulo.batchWriterThreads", "1")));
bw = connector.createBatchWriter(table, bwc);
// Create our scanners
singleScanner = connector.createScanner(table, Authorizations.EMPTY);
scanScanner = connector.createScanner(table, Authorizations.EMPTY);
table = t; // Store the name of the table we have open.
}
/**
* Gets a scanner from Accumulo over one row.
*
* @param row
* the row to scan
* @param fields
* the set of columns to scan
* @return an Accumulo {@link Scanner} bound to the given row and columns
*/
private Scanner getRow(Text row, Set<String> fields) {
singleScanner.clearColumns();
singleScanner.setRange(new Range(row));
if (fields != null) {
for (String field : fields) {
singleScanner.fetchColumn(colFam, new Text(field));
}
}
return singleScanner;
}
@Override
public int read(String t, String key, Set<String> fields,
HashMap<String, ByteIterator> result) {
try {
checkTable(t);
} catch (TableNotFoundException e) {
System.err.println("Error trying to connect to Accumulo table." + e);
return SERVER_ERROR;
}
try {
// Pick out the results we care about.
for (Entry<Key, Value> entry : getRow(new Text(key), null)) {
Value v = entry.getValue();
byte[] buf = v.get();
result.put(entry.getKey().getColumnQualifier().toString(),
new ByteArrayByteIterator(buf));
}
} catch (Exception e) {
System.err.println("Error trying to reading Accumulo table" + key + e);
return SERVER_ERROR;
}
return OK;
}
@Override
public int scan(String t, String startkey, int recordcount,
Set<String> fields, Vector<HashMap<String, ByteIterator>> result) {
try {
checkTable(t);
} catch (TableNotFoundException e) {
System.err.println("Error trying to connect to Accumulo table." + e);
return SERVER_ERROR;
}
// There doesn't appear to be a way to create a range for a given
// LENGTH. Just start and end keys. So we'll do this the hard way for
// now:
// Just make the end 'infinity' and only read as much as we need.
scanScanner.clearColumns();
scanScanner.setRange(new Range(new Text(startkey), null));
// Batch size is how many key/values to try to get per call. Here, I'm
// guessing that the number of keys in a row is equal to the number of
// fields
// we're interested in.
// We try to fetch one more so as to tell when we've run out of fields.
if (fields != null) {
// And add each of them as fields we want.
for (String field : fields) {
scanScanner.fetchColumn(colFam, new Text(field));
}
} // else - If no fields are provided, we assume one column/row.
String rowKey = "";
HashMap<String, ByteIterator> currentHM = null;
int count = 0;
// Begin the iteration.
for (Entry<Key, Value> entry : scanScanner) {
// Check for a new row.
if (!rowKey.equals(entry.getKey().getRow().toString())) {
if (count++ == recordcount) { // Done reading the last row.
break;
}
rowKey = entry.getKey().getRow().toString();
if (fields != null) {
// Initial Capacity for all keys.
currentHM = new HashMap<String, ByteIterator>(fields.size());
} else {
// An empty result map.
currentHM = new HashMap<String, ByteIterator>();
}
result.add(currentHM);
}
// Now add the key to the hashmap.
Value v = entry.getValue();
byte[] buf = v.get();
currentHM.put(entry.getKey().getColumnQualifier().toString(),
new ByteArrayByteIterator(buf));
}
return OK;
}
@Override
public int update(String t, String key,
HashMap<String, ByteIterator> values) {
try {
checkTable(t);
} catch (TableNotFoundException e) {
System.err.println("Error trying to connect to Accumulo table." + e);
return SERVER_ERROR;
}
Mutation mutInsert = new Mutation(new Text(key));
for (Map.Entry<String, ByteIterator> entry : values.entrySet()) {
mutInsert.put(colFam, new Text(entry.getKey()),
System.currentTimeMillis(), new Value(entry.getValue().toArray()));
}
try {
bw.addMutation(mutInsert);
// Distributed YCSB co-ordination: YCSB on a client produces the key
// to
// be stored in the shared queue in ZooKeeper.
if (pcFlag.equals(PC_PRODUCER)) {
if (r.nextFloat() < 0.01) {
keyNotification(key);
}
}
} catch (MutationsRejectedException e) {
System.err.println("Error performing update.");
e.printStackTrace();
return SERVER_ERROR;
} catch (KeeperException e) {
System.err.println("Error notifying the Zookeeper Queue.");
e.printStackTrace();
return SERVER_ERROR;
}
return OK;
}
@Override
public int insert(String t, String key,
HashMap<String, ByteIterator> values) {
return update(t, key, values);
}
@Override
public int delete(String t, String key) {
try {
checkTable(t);
} catch (TableNotFoundException e) {
System.err.println("Error trying to connect to Accumulo table." + e);
return SERVER_ERROR;
}
try {
deleteRow(new Text(key));
} catch (MutationsRejectedException e) {
System.err.println("Error performing delete.");
e.printStackTrace();
return SERVER_ERROR;
} catch (RuntimeException e) {
System.err.println("Error performing delete.");
e.printStackTrace();
return SERVER_ERROR;
}
return OK;
}
// These functions are adapted from RowOperations.java:
private void deleteRow(Text row) throws MutationsRejectedException {
deleteRow(getRow(row, null));
}
/**
* Deletes a row, given a Scanner of JUST that row.
*/
private void deleteRow(Scanner scanner) throws MutationsRejectedException {
Mutation deleter = null;
// iterate through the keys
for (Entry<Key, Value> entry : scanner) {
// create a mutation for the row
if (deleter == null) {
deleter = new Mutation(entry.getKey().getRow());
}
// the remove function adds the key with the delete flag set to true
deleter.putDelete(entry.getKey().getColumnFamily(),
entry.getKey().getColumnQualifier());
}
bw.addMutation(deleter);
}
private void keyNotification(String key) throws KeeperException {
if (pcFlag.equals(PC_PRODUCER)) {
try {
q.produce(key);
} catch (InterruptedException e) {
// Reset the interrupted state.
Thread.currentThread().interrupt();
}
} else {
// XXX: do something better to keep the loop going (while??)
for (int i = 0; i < 10000000; i++) {
try {
String strKey = q.consume();
if (!hmKeyReads.containsKey(strKey)
&& !hmKeyNumReads.containsKey(strKey)) {
hmKeyReads.put(strKey, new Long(System.currentTimeMillis()));
hmKeyNumReads.put(strKey, new Integer(1));
}
// YCSB Consumer will read the key that was fetched from the
// queue in ZooKeeper.
// (current way is kind of ugly but works, i think)
// TODO : Get table name from configuration or argument
String usertable = "usertable";
HashSet<String> fields = new HashSet<String>();
for (int j = 0; j < 9; j++) {
fields.add("field" + j);
}
HashMap<String, ByteIterator> result =
new HashMap<String, ByteIterator>();
int retval = read(usertable, strKey, fields, result);
// If the results are empty, the key is enqueued in
// Zookeeper
// and tried again, until the results are found.
if (result.size() == 0) {
q.produce(strKey);
int count = ((Integer) hmKeyNumReads.get(strKey)).intValue();
hmKeyNumReads.put(strKey, new Integer(count + 1));
} else {
if (((Integer) hmKeyNumReads.get(strKey)).intValue() > 1) {
long currTime = System.currentTimeMillis();
long writeTime = ((Long) hmKeyReads.get(strKey)).longValue();
System.out.println(
"Key=" + strKey + ";StartSearch=" + writeTime + ";EndSearch="
+ currTime + ";TimeLag=" + (currTime - writeTime));
}
}
} catch (InterruptedException e) {
// Reset the interrupted state.
Thread.currentThread().interrupt();
}
}
}
}
public int presplit(String t, String[] keys)
throws TableNotFoundException, AccumuloException,
AccumuloSecurityException {
TreeSet<Text> splits = new TreeSet<Text>();
for (int i = 0; i < keys.length; i++) {
splits.add(new Text(keys[i]));
}
connector.tableOperations().addSplits(t, splits);
return OK;
}
}
......@@ -21,7 +21,6 @@ package com.yahoo.ycsb.db;
import java.io.IOException;
import java.util.List;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
......@@ -30,111 +29,165 @@ import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
// Implementing the PC Queue in ZooKeeper
//
/**
* Implementing the PC (Producer/Consumer) Queue in ZooKeeper.
*/
public class ZKProducerConsumer implements Watcher {
static ZooKeeper zk = null;
static Integer mutex;
String root;
// Constructor that takes tha address of the ZK server
//
ZKProducerConsumer(String address) {
if(zk == null){
try {
System.out.println("Starting ZK:");
zk = new ZooKeeper(address, 3000, this);
mutex = new Integer(-1);
System.out.println("Finished starting ZK: " + zk);
} catch (IOException e) {
System.out.println(e.toString());
zk = null;
}
}
//else mutex = new Integer(-1);
private static ZooKeeper zk = null;
private static Integer mutex;
private String root;
/**
* Constructor that takes the address of the ZK server.
*
* @param address
* The address of the ZK server.
*/
ZKProducerConsumer(String address) {
if (zk == null) {
try {
System.out.println("Starting ZK:");
zk = new ZooKeeper(address, 3000, this);
mutex = new Integer(-1);
System.out.println("Finished starting ZK: " + zk);
} catch (IOException e) {
System.out.println(e.toString());
zk = null;
}
}
// else mutex = new Integer(-1);
}
synchronized public void process(WatchedEvent event) {
synchronized (mutex) {
//System.out.println("Process: " + event.getType());
mutex.notify();
}
public synchronized void process(WatchedEvent event) {
synchronized (mutex) {
// System.out.println("Process: " + event.getType());
mutex.notify();
}
static public class QueueElement {
public String key;
public long writeTime;
QueueElement(String key, long writeTime) {
this.key = key;
this.writeTime = writeTime;
}
}
/**
* Returns the root.
*
* @return The root.
*/
protected String getRoot() {
return root;
}
/**
* Sets the root.
*
* @param r
* The root value.
*/
protected void setRoot(String r) {
this.root = r;
}
/**
* QueueElement a single queue element. No longer used.
* @deprecated No longer used.
*/
@Deprecated
public static class QueueElement {
private String key;
private long writeTime;
QueueElement(String key, long writeTime) {
this.key = key;
this.writeTime = writeTime;
}
// Producer-Consumer queue
static public class Queue extends ZKProducerConsumer {
// Constructor of producer-consumer queue
Queue(String address, String name) {
super(address);
this.root = name;
// Create ZK node name
if (zk != null) {
try {
Stat s = zk.exists(root, false);
if (s == null) {
zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
} catch (KeeperException e) {
System.out
.println("Keeper exception when instantiating queue: "
+ e.toString());
} catch (InterruptedException e) {
System.out.println("Interrupted exception");
}
}
}
/**
* Producer-Consumer queue.
*/
public static class Queue extends ZKProducerConsumer {
/**
* Constructor of producer-consumer queue.
*
* @param address
* The Zookeeper server address.
* @param name
* The name of the root element for the queue.
*/
Queue(String address, String name) {
super(address);
this.setRoot(name);
// Create ZK node name
if (zk != null) {
try {
Stat s = zk.exists(getRoot(), false);
if (s == null) {
zk.create(getRoot(), new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
} catch (KeeperException e) {
System.out.println(
"Keeper exception when instantiating queue: " + e.toString());
} catch (InterruptedException e) {
System.out.println("Interrupted exception");
}
}
}
// Producer calls this method to insert the key in the queue
//
boolean produce(String key) throws KeeperException, InterruptedException{
byte[] value;
value = key.getBytes();
zk.create(root + "/key", value,
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
/**
* Producer calls this method to insert the key in the queue.
*
* @param key
* The key to produce (add to the queue).
* @return True if the key was added.
* @throws KeeperException
* On a failure talking to zookeeper.
* @throws InterruptedException
* If the current thread is interrupted waiting for the zookeeper
* acknowledgement.
*/
//
boolean produce(String key) throws KeeperException, InterruptedException {
byte[] value;
value = key.getBytes();
zk.create(getRoot() + "/key", value, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL);
return true;
}
return true;
}
// Consumer calls this method to "wait" for the key to the available
//
String consume() throws KeeperException, InterruptedException {
String retvalue = null;
Stat stat = null;
// Get the first element available
while (true) {
synchronized (mutex) {
List<String> list = zk.getChildren(root, true);
if (list.size() == 0) {
System.out.println("Going to wait");
mutex.wait();
} else {
String path = root+"/"+list.get(0);
byte[] b = zk.getData(path, false, stat);
retvalue = new String(b);
zk.delete(path, -1);
return retvalue;
}
}
}
/**
* Consumer calls this method to "wait" for the key to the available.
*
* @return The key to consumed (remove from the queue).
* @throws KeeperException
* On a failure talking to zookeeper.
* @throws InterruptedException
* If the current thread is interrupted waiting for the zookeeper
* acknowledgement.
*/
String consume() throws KeeperException, InterruptedException {
String retvalue = null;
Stat stat = null;
// Get the first element available
while (true) {
synchronized (mutex) {
List<String> list = zk.getChildren(getRoot(), true);
if (list.size() == 0) {
System.out.println("Going to wait");
mutex.wait();
} else {
String path = getRoot() + "/" + list.get(0);
byte[] b = zk.getData(path, false, stat);
retvalue = new String(b);
zk.delete(path, -1);
return retvalue;
}
}
}
}
}
}
/**
* Copyright (c) 2015 YCSB contributors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
/**
* YCSB binding for <a href="https://accumulo.apache.org/">Accumulo</a>.
*/
package com.yahoo.ycsb.db;
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment