diff --git a/accumulo/pom.xml b/accumulo/pom.xml index e9f706ffa84bfc58990db62298ee643618a05bb3..3e71def2494167035f3718e2a8a07579b579a8e9 100644 --- a/accumulo/pom.xml +++ b/accumulo/pom.xml @@ -64,4 +64,28 @@ LICENSE file. <scope>provided</scope> </dependency> </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + <version>2.15</version> + <configuration> + <consoleOutput>true</consoleOutput> + <configLocation>../checkstyle.xml</configLocation> + <failOnViolation>true</failOnViolation> + <failsOnError>true</failsOnError> + </configuration> + <executions> + <execution> + <id>validate</id> + <phase>validate</phase> + <goals> + <goal>checkstyle</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> </project> diff --git a/accumulo/src/main/java/com/yahoo/ycsb/db/AccumuloClient.java b/accumulo/src/main/java/com/yahoo/ycsb/db/AccumuloClient.java index d661908e72c9c860095a6161f6a9903ed6e436bc..e4e79f929c82c4ee336eae58c83bf499b6158d59 100644 --- a/accumulo/src/main/java/com/yahoo/ycsb/db/AccumuloClient.java +++ b/accumulo/src/main/java/com/yahoo/ycsb/db/AccumuloClient.java @@ -54,400 +54,403 @@ import com.yahoo.ycsb.ByteIterator; import com.yahoo.ycsb.DB; import com.yahoo.ycsb.DBException; +/** + * <a href="https://accumulo.apache.org/">Accumulo</a> binding for YCSB. + */ public class AccumuloClient extends DB { - // Error code constants. - public static final int Ok = 0; - public static final int ServerError = -1; - public static final int HttpError = -2; - public static final int NoMatchingRecord = -3; - - private ZooKeeperInstance _inst; - private Connector _connector; - private String _table = ""; - private BatchWriter _bw = null; - private Text _colFam = new Text(""); - private Scanner _singleScanner = null; // A scanner for reads/deletes. - private Scanner _scanScanner = null; // A scanner for use by scan() - - private static final String PC_PRODUCER = "producer"; - private static final String PC_CONSUMER = "consumer"; - private String _PC_FLAG = ""; - private ZKProducerConsumer.Queue q = null; - private static Hashtable<String,Long> hmKeyReads = null; - private static Hashtable<String,Integer> hmKeyNumReads = null; - private Random r = null; - - - @Override - public void init() throws DBException { - _colFam = new Text(getProperties().getProperty("accumulo.columnFamily")); - - _inst = new ZooKeeperInstance(getProperties().getProperty("accumulo.instanceName"), - getProperties().getProperty("accumulo.zooKeepers")); - try { - String principal = getProperties().getProperty("accumulo.username"); - AuthenticationToken token = new PasswordToken(getProperties().getProperty("accumulo.password")); - _connector = _inst.getConnector(principal, token); - } catch (AccumuloException e) { - throw new DBException(e); - } catch (AccumuloSecurityException e) { - throw new DBException(e); - } - - _PC_FLAG = getProperties().getProperty("accumulo.PC_FLAG","none"); - if (_PC_FLAG.equals(PC_PRODUCER) || _PC_FLAG.equals(PC_CONSUMER)) { - System.out.println("*** YCSB Client is "+_PC_FLAG); - String address = getProperties().getProperty("accumulo.PC_SERVER"); - String root = getProperties().getProperty("accumulo.PC_ROOT_IN_ZK"); - System.out.println("*** PC_INFO(server:"+address+";root="+root+")"); - q = new ZKProducerConsumer.Queue(address, root); - r = new Random(); - } - - if (_PC_FLAG.equals(PC_CONSUMER)) { - hmKeyReads = new Hashtable<String,Long>(); - hmKeyNumReads = new Hashtable<String,Integer>(); - keyNotification(null); - } - } - - - @Override - public void cleanup() throws DBException - { - try { - if (_bw != null) { - _bw.close(); - } - } catch (MutationsRejectedException e) { - throw new DBException(e); - } - CleanUp.shutdownNow(); - } - - /** - * Commonly repeated functionality: Before doing any operation, make sure - * we're working on the correct table. If not, open the correct one. - * - * @param table - */ - public void checkTable(String table) throws TableNotFoundException { - if (!_table.equals(table)) { - getTable(table); - } - } - - /** - * Called when the user specifies a table that isn't the same as the - * existing table. Connect to it and if necessary, close our current - * connection. - * - * @param table - */ - public void getTable(String table) throws TableNotFoundException { - if (_bw != null) { // Close the existing writer if necessary. - try { - _bw.close(); - } catch (MutationsRejectedException e) { - // Couldn't spit out the mutations we wanted. - // Ignore this for now. - } - } - - BatchWriterConfig bwc = new BatchWriterConfig(); - bwc.setMaxLatency(Long.parseLong(getProperties().getProperty("accumulo.batchWriterMaxLatency", "30000")), TimeUnit.MILLISECONDS); - bwc.setMaxMemory(Long.parseLong(getProperties().getProperty("accumulo.batchWriterSize", "100000"))); - bwc.setMaxWriteThreads(Integer.parseInt(getProperties().getProperty("accumulo.batchWriterThreads", "1"))); - - _bw = _connector.createBatchWriter(table, bwc); - - // Create our scanners - _singleScanner = _connector.createScanner(table, Authorizations.EMPTY); - _scanScanner = _connector.createScanner(table, Authorizations.EMPTY); - - _table = table; // Store the name of the table we have open. - } - - /** - * Gets a scanner from Accumulo over one row - * - * @param row the row to scan - * @param fields the set of columns to scan - * @return an Accumulo {@link Scanner} bound to the given row and columns - */ - private Scanner getRow(Text row, Set<String> fields) - { - _singleScanner.clearColumns(); - _singleScanner.setRange(new Range(row)); - if (fields != null) { - for(String field:fields) - { - _singleScanner.fetchColumn(_colFam, new Text(field)); - } - } - return _singleScanner; - } - - @Override - public int read(String table, String key, Set<String> fields, - HashMap<String, ByteIterator> result) { - - try { - checkTable(table); - } catch (TableNotFoundException e) { - System.err.println("Error trying to connect to Accumulo table." + e); - return ServerError; - } - - try { - // Pick out the results we care about. - for (Entry<Key, Value> entry : getRow(new Text(key), null)) { - Value v = entry.getValue(); - byte[] buf = v.get(); - result.put(entry.getKey().getColumnQualifier().toString(), - new ByteArrayByteIterator(buf)); - } - } catch (Exception e) { - System.err.println("Error trying to reading Accumulo table" + key + e); - return ServerError; - } - return Ok; - - } - - @Override - public int scan(String table, String startkey, int recordcount, - Set<String> fields, Vector<HashMap<String, ByteIterator>> result) { - try { - checkTable(table); - } catch (TableNotFoundException e) { - System.err.println("Error trying to connect to Accumulo table." + e); - return ServerError; - } - - // There doesn't appear to be a way to create a range for a given - // LENGTH. Just start and end keys. So we'll do this the hard way for now: - // Just make the end 'infinity' and only read as much as we need. - _scanScanner.clearColumns(); - _scanScanner.setRange(new Range(new Text(startkey), null)); - - // Batch size is how many key/values to try to get per call. Here, I'm - // guessing that the number of keys in a row is equal to the number of fields - // we're interested in. - // We try to fetch one more so as to tell when we've run out of fields. - - if (fields != null) { - // And add each of them as fields we want. - for(String field:fields) - { - _scanScanner.fetchColumn(_colFam, new Text(field)); - } - } else { - // If no fields are provided, we assume one column/row. - } - - String rowKey = ""; - HashMap<String, ByteIterator> currentHM = null; - int count = 0; - - // Begin the iteration. - for (Entry<Key, Value> entry : _scanScanner) { - // Check for a new row. - if (!rowKey.equals(entry.getKey().getRow().toString())) { - if (count++ == recordcount) { // Done reading the last row. - break; - } - rowKey = entry.getKey().getRow().toString(); - if (fields != null) { - // Initial Capacity for all keys. - currentHM = new HashMap<String, ByteIterator>(fields.size()); - } - else - { - // An empty result map. - currentHM = new HashMap<String, ByteIterator>(); - } - result.add(currentHM); - } - // Now add the key to the hashmap. - Value v = entry.getValue(); - byte[] buf = v.get(); - currentHM.put(entry.getKey().getColumnQualifier().toString(), new ByteArrayByteIterator(buf)); - } - - return Ok; - - } - - @Override - public int update(String table, String key, HashMap<String, ByteIterator> values) { - try { - checkTable(table); - } catch (TableNotFoundException e) { - System.err.println("Error trying to connect to Accumulo table." + e); - return ServerError; - } - - Mutation mutInsert = new Mutation(new Text(key)); - for (Map.Entry<String, ByteIterator> entry : values.entrySet()) { - mutInsert.put(_colFam, new Text(entry.getKey()), System - .currentTimeMillis(), - new Value(entry.getValue().toArray())); - } - - try { - _bw.addMutation(mutInsert); - // Distributed YCSB co-ordination: YCSB on a client produces the key to - // be stored in the shared queue in ZooKeeper. - if (_PC_FLAG.equals(PC_PRODUCER)) { - if (r.nextFloat() < 0.01) - keyNotification(key); - } - } catch (MutationsRejectedException e) { - System.err.println("Error performing update."); - e.printStackTrace(); - return ServerError; - } - - - return Ok; - } - - @Override - public int insert(String table, String key, HashMap<String, ByteIterator> values) { - return update(table, key, values); - } - - @Override - public int delete(String table, String key) { - try { - checkTable(table); - } catch (TableNotFoundException e) { - System.err.println("Error trying to connect to Accumulo table." + e); - return ServerError; - } - - try { - deleteRow(new Text(key)); - } catch (RuntimeException e) { - System.err.println("Error performing delete."); - e.printStackTrace(); - return ServerError; - } - - return Ok; - } - - // These functions are adapted from RowOperations.java: - private void deleteRow(Text row) { - deleteRow(getRow(row, null)); - } - - - /** - * Deletes a row, given a Scanner of JUST that row - * - */ - private void deleteRow(Scanner scanner) { - Mutation deleter = null; - // iterate through the keys - for (Entry<Key,Value> entry : scanner) { - // create a mutation for the row - if (deleter == null) - deleter = new Mutation(entry.getKey().getRow()); - // the remove function adds the key with the delete flag set to true - deleter.putDelete(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier()); - } - try { - _bw.addMutation(deleter); - } catch (MutationsRejectedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - - } - - private void keyNotification(String key) { - - if (_PC_FLAG.equals(PC_PRODUCER)) { - try { - q.produce(key); - } catch (KeeperException e) { - - } catch (InterruptedException e) { - - } - } else { - //XXX: do something better to keep the loop going (while??) - for (int i = 0; i < 10000000; i++) { - try { - String strKey = q.consume(); - - if ((hmKeyReads.containsKey(strKey) == false) && - (hmKeyNumReads.containsKey(strKey) == false)) { - hmKeyReads.put(strKey, new Long(System.currentTimeMillis())); - hmKeyNumReads.put(strKey, new Integer(1)); - } - - //YCSB Consumer will read the key that was fetched from the - //queue in ZooKeeper. - //(current way is kind of ugly but works, i think) - //TODO : Get table name from configuration or argument - String table = "usertable"; - HashSet<String> fields = new HashSet<String>(); - for (int j=0; j<9; j++) - fields.add("field"+j); - HashMap<String,ByteIterator> result = new HashMap<String,ByteIterator>(); - - int retval = read(table, strKey, fields, result); - //If the results are empty, the key is enqueued in Zookeeper - //and tried again, until the results are found. - if (result.size() == 0) { - q.produce(strKey); - int count = ((Integer)hmKeyNumReads.get(strKey)).intValue(); - hmKeyNumReads.put(strKey, new Integer(count+1)); - } - else { - if (((Integer)hmKeyNumReads.get(strKey)).intValue() > 1) { - long currTime = System.currentTimeMillis(); - long writeTime = ((Long)hmKeyReads.get(strKey)).longValue(); - System.out.println("Key="+strKey+ - //";StartSearch="+writeTime+ - //";EndSearch="+currTime+ - ";TimeLag="+(currTime-writeTime)); - } - } - - } catch (KeeperException e) { - - } catch (InterruptedException e) { - - } - } - } - - } - - public int presplit(String table, String[] keys) - { - TreeSet<Text> splits = new TreeSet<Text>(); - for (int i = 0;i < keys.length; i ++) - { - splits.add(new Text(keys[i])); - } - try { - _connector.tableOperations().addSplits(table, splits); - } catch (TableNotFoundException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } catch (AccumuloException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } catch (AccumuloSecurityException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - return Ok; - } + // Error code constants. + public static final int OK = 0; + public static final int SERVER_ERROR = -1; + public static final int HTTP_ERROR = -2; + public static final int NO_MATCHING_RECORD = -3; + + private ZooKeeperInstance inst; + private Connector connector; + private String table = ""; + private BatchWriter bw = null; + private Text colFam = new Text(""); + private Scanner singleScanner = null; // A scanner for reads/deletes. + private Scanner scanScanner = null; // A scanner for use by scan() + + private static final String PC_PRODUCER = "producer"; + private static final String PC_CONSUMER = "consumer"; + private String pcFlag = ""; + private ZKProducerConsumer.Queue q = null; + private static Hashtable<String, Long> hmKeyReads = null; + private static Hashtable<String, Integer> hmKeyNumReads = null; + private Random r = null; + + @Override + public void init() throws DBException { + colFam = new Text(getProperties().getProperty("accumulo.columnFamily")); + + inst = new ZooKeeperInstance( + getProperties().getProperty("accumulo.instanceName"), + getProperties().getProperty("accumulo.zooKeepers")); + try { + String principal = getProperties().getProperty("accumulo.username"); + AuthenticationToken token = + new PasswordToken(getProperties().getProperty("accumulo.password")); + connector = inst.getConnector(principal, token); + } catch (AccumuloException e) { + throw new DBException(e); + } catch (AccumuloSecurityException e) { + throw new DBException(e); + } + + pcFlag = getProperties().getProperty("accumulo.PC_FLAG", "none"); + if (pcFlag.equals(PC_PRODUCER) || pcFlag.equals(PC_CONSUMER)) { + System.out.println("*** YCSB Client is " + pcFlag); + String address = getProperties().getProperty("accumulo.PC_SERVER"); + String root = getProperties().getProperty("accumulo.PC_ROOT_IN_ZK"); + System.out + .println("*** PC_INFO(server:" + address + ";root=" + root + ")"); + q = new ZKProducerConsumer.Queue(address, root); + r = new Random(); + } + + if (pcFlag.equals(PC_CONSUMER)) { + hmKeyReads = new Hashtable<String, Long>(); + hmKeyNumReads = new Hashtable<String, Integer>(); + try { + keyNotification(null); + } catch (KeeperException e) { + throw new DBException(e); + } + } + } + + @Override + public void cleanup() throws DBException { + try { + if (bw != null) { + bw.close(); + } + } catch (MutationsRejectedException e) { + throw new DBException(e); + } + CleanUp.shutdownNow(); + } + + /** + * Commonly repeated functionality: Before doing any operation, make sure + * we're working on the correct table. If not, open the correct one. + * + * @param t + * The table to open. + */ + public void checkTable(String t) throws TableNotFoundException { + if (!table.equals(t)) { + getTable(t); + } + } + + /** + * Called when the user specifies a table that isn't the same as the existing + * table. Connect to it and if necessary, close our current connection. + * + * @param t + * The table to open. + */ + public void getTable(String t) throws TableNotFoundException { + if (bw != null) { // Close the existing writer if necessary. + try { + bw.close(); + } catch (MutationsRejectedException e) { + // Couldn't spit out the mutations we wanted. + // Ignore this for now. + System.err.println("MutationsRejectedException: " + e.getMessage()); + } + } + + BatchWriterConfig bwc = new BatchWriterConfig(); + bwc.setMaxLatency( + Long.parseLong(getProperties() + .getProperty("accumulo.batchWriterMaxLatency", "30000")), + TimeUnit.MILLISECONDS); + bwc.setMaxMemory(Long.parseLong( + getProperties().getProperty("accumulo.batchWriterSize", "100000"))); + bwc.setMaxWriteThreads(Integer.parseInt( + getProperties().getProperty("accumulo.batchWriterThreads", "1"))); + + bw = connector.createBatchWriter(table, bwc); + + // Create our scanners + singleScanner = connector.createScanner(table, Authorizations.EMPTY); + scanScanner = connector.createScanner(table, Authorizations.EMPTY); + + table = t; // Store the name of the table we have open. + } + + /** + * Gets a scanner from Accumulo over one row. + * + * @param row + * the row to scan + * @param fields + * the set of columns to scan + * @return an Accumulo {@link Scanner} bound to the given row and columns + */ + private Scanner getRow(Text row, Set<String> fields) { + singleScanner.clearColumns(); + singleScanner.setRange(new Range(row)); + if (fields != null) { + for (String field : fields) { + singleScanner.fetchColumn(colFam, new Text(field)); + } + } + return singleScanner; + } + + @Override + public int read(String t, String key, Set<String> fields, + HashMap<String, ByteIterator> result) { + + try { + checkTable(t); + } catch (TableNotFoundException e) { + System.err.println("Error trying to connect to Accumulo table." + e); + return SERVER_ERROR; + } + + try { + // Pick out the results we care about. + for (Entry<Key, Value> entry : getRow(new Text(key), null)) { + Value v = entry.getValue(); + byte[] buf = v.get(); + result.put(entry.getKey().getColumnQualifier().toString(), + new ByteArrayByteIterator(buf)); + } + } catch (Exception e) { + System.err.println("Error trying to reading Accumulo table" + key + e); + return SERVER_ERROR; + } + return OK; + + } + + @Override + public int scan(String t, String startkey, int recordcount, + Set<String> fields, Vector<HashMap<String, ByteIterator>> result) { + try { + checkTable(t); + } catch (TableNotFoundException e) { + System.err.println("Error trying to connect to Accumulo table." + e); + return SERVER_ERROR; + } + + // There doesn't appear to be a way to create a range for a given + // LENGTH. Just start and end keys. So we'll do this the hard way for + // now: + // Just make the end 'infinity' and only read as much as we need. + scanScanner.clearColumns(); + scanScanner.setRange(new Range(new Text(startkey), null)); + + // Batch size is how many key/values to try to get per call. Here, I'm + // guessing that the number of keys in a row is equal to the number of + // fields + // we're interested in. + // We try to fetch one more so as to tell when we've run out of fields. + + if (fields != null) { + // And add each of them as fields we want. + for (String field : fields) { + scanScanner.fetchColumn(colFam, new Text(field)); + } + } // else - If no fields are provided, we assume one column/row. + + String rowKey = ""; + HashMap<String, ByteIterator> currentHM = null; + int count = 0; + + // Begin the iteration. + for (Entry<Key, Value> entry : scanScanner) { + // Check for a new row. + if (!rowKey.equals(entry.getKey().getRow().toString())) { + if (count++ == recordcount) { // Done reading the last row. + break; + } + rowKey = entry.getKey().getRow().toString(); + if (fields != null) { + // Initial Capacity for all keys. + currentHM = new HashMap<String, ByteIterator>(fields.size()); + } else { + // An empty result map. + currentHM = new HashMap<String, ByteIterator>(); + } + result.add(currentHM); + } + // Now add the key to the hashmap. + Value v = entry.getValue(); + byte[] buf = v.get(); + currentHM.put(entry.getKey().getColumnQualifier().toString(), + new ByteArrayByteIterator(buf)); + } + + return OK; + } + + @Override + public int update(String t, String key, + HashMap<String, ByteIterator> values) { + try { + checkTable(t); + } catch (TableNotFoundException e) { + System.err.println("Error trying to connect to Accumulo table." + e); + return SERVER_ERROR; + } + + Mutation mutInsert = new Mutation(new Text(key)); + for (Map.Entry<String, ByteIterator> entry : values.entrySet()) { + mutInsert.put(colFam, new Text(entry.getKey()), + System.currentTimeMillis(), new Value(entry.getValue().toArray())); + } + + try { + bw.addMutation(mutInsert); + // Distributed YCSB co-ordination: YCSB on a client produces the key + // to + // be stored in the shared queue in ZooKeeper. + if (pcFlag.equals(PC_PRODUCER)) { + if (r.nextFloat() < 0.01) { + keyNotification(key); + } + } + } catch (MutationsRejectedException e) { + System.err.println("Error performing update."); + e.printStackTrace(); + return SERVER_ERROR; + } catch (KeeperException e) { + System.err.println("Error notifying the Zookeeper Queue."); + e.printStackTrace(); + return SERVER_ERROR; + } + + return OK; + } + + @Override + public int insert(String t, String key, + HashMap<String, ByteIterator> values) { + return update(t, key, values); + } + + @Override + public int delete(String t, String key) { + try { + checkTable(t); + } catch (TableNotFoundException e) { + System.err.println("Error trying to connect to Accumulo table." + e); + return SERVER_ERROR; + } + + try { + deleteRow(new Text(key)); + } catch (MutationsRejectedException e) { + System.err.println("Error performing delete."); + e.printStackTrace(); + return SERVER_ERROR; + } catch (RuntimeException e) { + System.err.println("Error performing delete."); + e.printStackTrace(); + return SERVER_ERROR; + } + + return OK; + } + + // These functions are adapted from RowOperations.java: + private void deleteRow(Text row) throws MutationsRejectedException { + deleteRow(getRow(row, null)); + } + + /** + * Deletes a row, given a Scanner of JUST that row. + */ + private void deleteRow(Scanner scanner) throws MutationsRejectedException { + Mutation deleter = null; + // iterate through the keys + for (Entry<Key, Value> entry : scanner) { + // create a mutation for the row + if (deleter == null) { + deleter = new Mutation(entry.getKey().getRow()); + } + // the remove function adds the key with the delete flag set to true + deleter.putDelete(entry.getKey().getColumnFamily(), + entry.getKey().getColumnQualifier()); + } + + bw.addMutation(deleter); + } + + private void keyNotification(String key) throws KeeperException { + + if (pcFlag.equals(PC_PRODUCER)) { + try { + q.produce(key); + } catch (InterruptedException e) { + // Reset the interrupted state. + Thread.currentThread().interrupt(); + } + } else { + // XXX: do something better to keep the loop going (while??) + for (int i = 0; i < 10000000; i++) { + try { + String strKey = q.consume(); + + if (!hmKeyReads.containsKey(strKey) + && !hmKeyNumReads.containsKey(strKey)) { + hmKeyReads.put(strKey, new Long(System.currentTimeMillis())); + hmKeyNumReads.put(strKey, new Integer(1)); + } + + // YCSB Consumer will read the key that was fetched from the + // queue in ZooKeeper. + // (current way is kind of ugly but works, i think) + // TODO : Get table name from configuration or argument + String usertable = "usertable"; + HashSet<String> fields = new HashSet<String>(); + for (int j = 0; j < 9; j++) { + fields.add("field" + j); + } + HashMap<String, ByteIterator> result = + new HashMap<String, ByteIterator>(); + + int retval = read(usertable, strKey, fields, result); + // If the results are empty, the key is enqueued in + // Zookeeper + // and tried again, until the results are found. + if (result.size() == 0) { + q.produce(strKey); + int count = ((Integer) hmKeyNumReads.get(strKey)).intValue(); + hmKeyNumReads.put(strKey, new Integer(count + 1)); + } else { + if (((Integer) hmKeyNumReads.get(strKey)).intValue() > 1) { + long currTime = System.currentTimeMillis(); + long writeTime = ((Long) hmKeyReads.get(strKey)).longValue(); + System.out.println( + "Key=" + strKey + ";StartSearch=" + writeTime + ";EndSearch=" + + currTime + ";TimeLag=" + (currTime - writeTime)); + } + } + } catch (InterruptedException e) { + // Reset the interrupted state. + Thread.currentThread().interrupt(); + } + } + } + + } + + public int presplit(String t, String[] keys) + throws TableNotFoundException, AccumuloException, + AccumuloSecurityException { + TreeSet<Text> splits = new TreeSet<Text>(); + for (int i = 0; i < keys.length; i++) { + splits.add(new Text(keys[i])); + } + connector.tableOperations().addSplits(t, splits); + return OK; + } } diff --git a/accumulo/src/main/java/com/yahoo/ycsb/db/ZKProducerConsumer.java b/accumulo/src/main/java/com/yahoo/ycsb/db/ZKProducerConsumer.java index 09741641dbeaf8ab444517c9c44027bf3a4a78db..2d42c79a5b9f090307b9b674f80a147419958d07 100644 --- a/accumulo/src/main/java/com/yahoo/ycsb/db/ZKProducerConsumer.java +++ b/accumulo/src/main/java/com/yahoo/ycsb/db/ZKProducerConsumer.java @@ -21,7 +21,6 @@ package com.yahoo.ycsb.db; import java.io.IOException; import java.util.List; - import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; @@ -30,111 +29,165 @@ import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.Stat; -// Implementing the PC Queue in ZooKeeper -// +/** + * Implementing the PC (Producer/Consumer) Queue in ZooKeeper. + */ public class ZKProducerConsumer implements Watcher { - static ZooKeeper zk = null; - static Integer mutex; - - String root; - - // Constructor that takes tha address of the ZK server - // - ZKProducerConsumer(String address) { - if(zk == null){ - try { - System.out.println("Starting ZK:"); - zk = new ZooKeeper(address, 3000, this); - mutex = new Integer(-1); - System.out.println("Finished starting ZK: " + zk); - } catch (IOException e) { - System.out.println(e.toString()); - zk = null; - } - } - //else mutex = new Integer(-1); + private static ZooKeeper zk = null; + private static Integer mutex; + + private String root; + + /** + * Constructor that takes the address of the ZK server. + * + * @param address + * The address of the ZK server. + */ + ZKProducerConsumer(String address) { + if (zk == null) { + try { + System.out.println("Starting ZK:"); + zk = new ZooKeeper(address, 3000, this); + mutex = new Integer(-1); + System.out.println("Finished starting ZK: " + zk); + } catch (IOException e) { + System.out.println(e.toString()); + zk = null; + } } + // else mutex = new Integer(-1); + } - synchronized public void process(WatchedEvent event) { - synchronized (mutex) { - //System.out.println("Process: " + event.getType()); - mutex.notify(); - } + public synchronized void process(WatchedEvent event) { + synchronized (mutex) { + // System.out.println("Process: " + event.getType()); + mutex.notify(); } - - - static public class QueueElement { - public String key; - public long writeTime; - - QueueElement(String key, long writeTime) { - this.key = key; - this.writeTime = writeTime; - } + } + + /** + * Returns the root. + * + * @return The root. + */ + protected String getRoot() { + return root; + } + + /** + * Sets the root. + * + * @param r + * The root value. + */ + protected void setRoot(String r) { + this.root = r; + } + + /** + * QueueElement a single queue element. No longer used. + * @deprecated No longer used. + */ + @Deprecated + public static class QueueElement { + private String key; + private long writeTime; + + QueueElement(String key, long writeTime) { + this.key = key; + this.writeTime = writeTime; } - - // Producer-Consumer queue - static public class Queue extends ZKProducerConsumer { - - // Constructor of producer-consumer queue - Queue(String address, String name) { - super(address); - this.root = name; - // Create ZK node name - if (zk != null) { - try { - Stat s = zk.exists(root, false); - if (s == null) { - zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - } - } catch (KeeperException e) { - System.out - .println("Keeper exception when instantiating queue: " - + e.toString()); - } catch (InterruptedException e) { - System.out.println("Interrupted exception"); - } - } + } + + /** + * Producer-Consumer queue. + */ + public static class Queue extends ZKProducerConsumer { + + /** + * Constructor of producer-consumer queue. + * + * @param address + * The Zookeeper server address. + * @param name + * The name of the root element for the queue. + */ + Queue(String address, String name) { + super(address); + this.setRoot(name); + // Create ZK node name + if (zk != null) { + try { + Stat s = zk.exists(getRoot(), false); + if (s == null) { + zk.create(getRoot(), new byte[0], Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + } + } catch (KeeperException e) { + System.out.println( + "Keeper exception when instantiating queue: " + e.toString()); + } catch (InterruptedException e) { + System.out.println("Interrupted exception"); } + } + } - // Producer calls this method to insert the key in the queue - // - boolean produce(String key) throws KeeperException, InterruptedException{ - byte[] value; - value = key.getBytes(); - zk.create(root + "/key", value, - Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); + /** + * Producer calls this method to insert the key in the queue. + * + * @param key + * The key to produce (add to the queue). + * @return True if the key was added. + * @throws KeeperException + * On a failure talking to zookeeper. + * @throws InterruptedException + * If the current thread is interrupted waiting for the zookeeper + * acknowledgement. + */ + // + boolean produce(String key) throws KeeperException, InterruptedException { + byte[] value; + value = key.getBytes(); + zk.create(getRoot() + "/key", value, Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT_SEQUENTIAL); - return true; - } + return true; + } - // Consumer calls this method to "wait" for the key to the available - // - String consume() throws KeeperException, InterruptedException { - String retvalue = null; - Stat stat = null; - - // Get the first element available - while (true) { - synchronized (mutex) { - List<String> list = zk.getChildren(root, true); - if (list.size() == 0) { - System.out.println("Going to wait"); - mutex.wait(); - } else { - String path = root+"/"+list.get(0); - byte[] b = zk.getData(path, false, stat); - retvalue = new String(b); - zk.delete(path, -1); - - return retvalue; - - } - } - } + /** + * Consumer calls this method to "wait" for the key to the available. + * + * @return The key to consumed (remove from the queue). + * @throws KeeperException + * On a failure talking to zookeeper. + * @throws InterruptedException + * If the current thread is interrupted waiting for the zookeeper + * acknowledgement. + */ + String consume() throws KeeperException, InterruptedException { + String retvalue = null; + Stat stat = null; + + // Get the first element available + while (true) { + synchronized (mutex) { + List<String> list = zk.getChildren(getRoot(), true); + if (list.size() == 0) { + System.out.println("Going to wait"); + mutex.wait(); + } else { + String path = getRoot() + "/" + list.get(0); + byte[] b = zk.getData(path, false, stat); + retvalue = new String(b); + zk.delete(path, -1); + + return retvalue; + + } } + } } + } } - diff --git a/accumulo/src/main/java/com/yahoo/ycsb/db/package-info.java b/accumulo/src/main/java/com/yahoo/ycsb/db/package-info.java new file mode 100644 index 0000000000000000000000000000000000000000..fbd7cf857468aa2e1dc5532b10ddd927e26432f2 --- /dev/null +++ b/accumulo/src/main/java/com/yahoo/ycsb/db/package-info.java @@ -0,0 +1,21 @@ +/** + * Copyright (c) 2015 YCSB contributors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +/** + * YCSB binding for <a href="https://accumulo.apache.org/">Accumulo</a>. + */ +package com.yahoo.ycsb.db; \ No newline at end of file