From 0b08f216c3f632817fd10924b019ee8e6f86d919 Mon Sep 17 00:00:00 2001 From: Sean Busbey <busbey@cloudera.com> Date: Mon, 5 Oct 2015 00:17:00 -0500 Subject: [PATCH] [accumulo] remove unused accumulo-specific distributed shared-work system. --- accumulo/README.md | 18 -- accumulo/pom.xml | 27 +-- .../ycsb/db/accumulo/AccumuloClient.java | 135 +----------- .../ycsb/db/accumulo/ZKProducerConsumer.java | 193 ------------------ .../yahoo/ycsb/db/accumulo/package-info.java | 6 +- .../java/com/yahoo/ycsb/db/package-info.java | 21 -- 6 files changed, 20 insertions(+), 380 deletions(-) delete mode 100644 accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/ZKProducerConsumer.java delete mode 100644 accumulo/src/main/java/com/yahoo/ycsb/db/package-info.java diff --git a/accumulo/README.md b/accumulo/README.md index e2555833..fd9b4e8d 100644 --- a/accumulo/README.md +++ b/accumulo/README.md @@ -80,22 +80,4 @@ Run the workload test: - `accumulo.password` - The password for the user connecting to Accumulo. - No default value. - -- `accumulo.PC_FLAG` - - Provides support for distributed clients using ZooKeeper to manage the Producers and Consumers. - - If not set then the YCSB client will perform all work locally. - - Allowed values are: - - `producer` - - `consumer` - - Not set - - Default value is not set. - -- `accumulo.PC_SERVER` - - The set of ZooKeeper servers to use for the prioducers and consumers to communicate. - - Should contain a comma separated list of of hostname or hostname:port values. - - No default value. - -- `accumulo.PC_ROOT_IN_ZK` - - The root node in the ZooKeepers for the producers and consumers to communicate. - - No default value. diff --git a/accumulo/pom.xml b/accumulo/pom.xml index 2c69a60d..9aab8fe9 100644 --- a/accumulo/pom.xml +++ b/accumulo/pom.xml @@ -27,35 +27,20 @@ LICENSE file. </parent> <artifactId>accumulo-binding</artifactId> <name>Accumulo DB Binding</name> + <properties> + <!-- This should match up to the one from your Accumulo version --> + <hadoop.version>2.2.0</hadoop.version> + </properties> <dependencies> <dependency> <groupId>org.apache.accumulo</groupId> <artifactId>accumulo-core</artifactId> <version>${accumulo.version}</version> - <exclusions> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.thrift</groupId> - <artifactId>thrift</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.zookeeper</groupId> - <artifactId>zookeeper</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.zookeeper</groupId> - <artifactId>zookeeper</artifactId> - <version>3.3.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-core</artifactId> - <version>0.20.203.0</version> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> </dependency> <dependency> <groupId>com.yahoo.ycsb</groupId> diff --git a/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/AccumuloClient.java b/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/AccumuloClient.java index 51dcd5e5..9f3448d8 100644 --- a/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/AccumuloClient.java +++ b/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/AccumuloClient.java @@ -1,5 +1,5 @@ /** - * Copyright (c) 2011 YCSB++ project, 2014 YCSB contributors. + * Copyright (c) 2011 YCSB++ project, 2014-2016 YCSB contributors. * All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); you @@ -42,16 +42,11 @@ import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.CleanUp; import org.apache.hadoop.io.Text; -import org.apache.zookeeper.KeeperException; import java.util.HashMap; -import java.util.HashSet; -import java.util.Hashtable; import java.util.Map; import java.util.Map.Entry; -import java.util.Random; import java.util.Set; -import java.util.TreeSet; import java.util.Vector; import java.util.concurrent.TimeUnit; @@ -68,14 +63,6 @@ public class AccumuloClient extends DB { private Scanner singleScanner = null; // A scanner for reads/deletes. private Scanner scanScanner = null; // A scanner for use by scan() - private static final String PC_PRODUCER = "producer"; - private static final String PC_CONSUMER = "consumer"; - private String pcFlag = ""; - private ZKProducerConsumer.Queue q = null; - private static Hashtable<String, Long> hmKeyReads = null; - private static Hashtable<String, Integer> hmKeyNumReads = null; - private Random r = null; - @Override public void init() throws DBException { colFam = new Text(getProperties().getProperty("accumulo.columnFamily")); @@ -94,25 +81,9 @@ public class AccumuloClient extends DB { throw new DBException(e); } - pcFlag = getProperties().getProperty("accumulo.PC_FLAG", "none"); - if (pcFlag.equals(PC_PRODUCER) || pcFlag.equals(PC_CONSUMER)) { - System.out.println("*** YCSB Client is " + pcFlag); - String address = getProperties().getProperty("accumulo.PC_SERVER"); - String root = getProperties().getProperty("accumulo.PC_ROOT_IN_ZK"); - System.out - .println("*** PC_INFO(server:" + address + ";root=" + root + ")"); - q = new ZKProducerConsumer.Queue(address, root); - r = new Random(); - } - - if (pcFlag.equals(PC_CONSUMER)) { - hmKeyReads = new Hashtable<String, Long>(); - hmKeyNumReads = new Hashtable<String, Integer>(); - try { - keyNotification(null); - } catch (KeeperException e) { - throw new DBException(e); - } + if (!(getProperties().getProperty("accumulo.pcFlag", "none").equals("none"))) { + System.err.println("Sorry, the ZK based producer/consumer implementation has been removed. " + + "Please see YCSB issue #416 for work on adding a general solution to coordinated work."); } } @@ -180,11 +151,9 @@ public class AccumuloClient extends DB { /** * Gets a scanner from Accumulo over one row. - * - * @param row - * the row to scan - * @param fields - * the set of columns to scan + * + * @param row the row to scan + * @param fields the set of columns to scan * @return an Accumulo {@link Scanner} bound to the given row and columns */ private Scanner getRow(Text row, Set<String> fields) { @@ -244,16 +213,17 @@ public class AccumuloClient extends DB { // Batch size is how many key/values to try to get per call. Here, I'm // guessing that the number of keys in a row is equal to the number of - // fields - // we're interested in. + // fields we're interested in. + // We try to fetch one more so as to tell when we've run out of fields. + // If no fields are provided, we assume one column/row. if (fields != null) { // And add each of them as fields we want. for (String field : fields) { scanScanner.fetchColumn(colFam, new Text(field)); } - } // else - If no fields are provided, we assume one column/row. + } String rowKey = ""; HashMap<String, ByteIterator> currentHM = null; @@ -304,22 +274,10 @@ public class AccumuloClient extends DB { try { bw.addMutation(mutInsert); - // Distributed YCSB co-ordination: YCSB on a client produces the key - // to - // be stored in the shared queue in ZooKeeper. - if (pcFlag.equals(PC_PRODUCER)) { - if (r.nextFloat() < 0.01) { - keyNotification(key); - } - } } catch (MutationsRejectedException e) { System.err.println("Error performing update."); e.printStackTrace(); return Status.ERROR; - } catch (KeeperException e) { - System.err.println("Error notifying the Zookeeper Queue."); - e.printStackTrace(); - return Status.ERROR; } return Status.OK; @@ -378,75 +336,4 @@ public class AccumuloClient extends DB { bw.addMutation(deleter); } - - private void keyNotification(String key) throws KeeperException { - - if (pcFlag.equals(PC_PRODUCER)) { - try { - q.produce(key); - } catch (InterruptedException e) { - // Reset the interrupted state. - Thread.currentThread().interrupt(); - } - } else { - // XXX: do something better to keep the loop going (while??) - for (int i = 0; i < 10000000; i++) { - try { - String strKey = q.consume(); - - if (!hmKeyReads.containsKey(strKey) - && !hmKeyNumReads.containsKey(strKey)) { - hmKeyReads.put(strKey, new Long(System.currentTimeMillis())); - hmKeyNumReads.put(strKey, new Integer(1)); - } - - // YCSB Consumer will read the key that was fetched from the - // queue in ZooKeeper. - // (current way is kind of ugly but works, i think) - // TODO : Get table name from configuration or argument - String usertable = "usertable"; - HashSet<String> fields = new HashSet<String>(); - for (int j = 0; j < 9; j++) { - fields.add("field" + j); - } - HashMap<String, ByteIterator> result = - new HashMap<String, ByteIterator>(); - - read(usertable, strKey, fields, result); - // If the results are empty, the key is enqueued in - // Zookeeper - // and tried again, until the results are found. - if (result.isEmpty()) { - q.produce(strKey); - int count = ((Integer) hmKeyNumReads.get(strKey)).intValue(); - hmKeyNumReads.put(strKey, new Integer(count + 1)); - } else { - if (((Integer) hmKeyNumReads.get(strKey)).intValue() > 1) { - long currTime = System.currentTimeMillis(); - long writeTime = ((Long) hmKeyReads.get(strKey)).longValue(); - System.out.println( - "Key=" + strKey + ";StartSearch=" + writeTime + ";EndSearch=" - + currTime + ";TimeLag=" + (currTime - writeTime)); - } - } - } catch (InterruptedException e) { - // Reset the interrupted state. - Thread.currentThread().interrupt(); - } - } - } - - } - - public Status presplit(String t, String[] keys) - throws TableNotFoundException, AccumuloException, - AccumuloSecurityException { - TreeSet<Text> splits = new TreeSet<Text>(); - for (int i = 0; i < keys.length; i++) { - splits.add(new Text(keys[i])); - } - connector.tableOperations().addSplits(t, splits); - return Status.OK; - } - } diff --git a/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/ZKProducerConsumer.java b/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/ZKProducerConsumer.java deleted file mode 100644 index 1b478f72..00000000 --- a/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/ZKProducerConsumer.java +++ /dev/null @@ -1,193 +0,0 @@ -/** - * Copyright (c) 2011 YCSB++ project, 2014 YCSB contributors. - * All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. You - * may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. See accompanying - * LICENSE file. - */ - -package com.yahoo.ycsb.db; - -import java.io.IOException; -import java.util.List; - -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.data.Stat; - -/** - * Implementing the PC (Producer/Consumer) Queue in ZooKeeper. - */ -public class ZKProducerConsumer implements Watcher { - - private static ZooKeeper zk = null; - private static Integer mutex; - - private String root; - - /** - * Constructor that takes the address of the ZK server. - * - * @param address - * The address of the ZK server. - */ - ZKProducerConsumer(String address) { - if (zk == null) { - try { - System.out.println("Starting ZK:"); - zk = new ZooKeeper(address, 3000, this); - mutex = new Integer(-1); - System.out.println("Finished starting ZK: " + zk); - } catch (IOException e) { - System.out.println(e.toString()); - zk = null; - } - } - // else mutex = new Integer(-1); - } - - public synchronized void process(WatchedEvent event) { - synchronized (mutex) { - // System.out.println("Process: " + event.getType()); - mutex.notify(); - } - } - - /** - * Returns the root. - * - * @return The root. - */ - protected String getRoot() { - return root; - } - - /** - * Sets the root. - * - * @param r - * The root value. - */ - protected void setRoot(String r) { - this.root = r; - } - - /** - * QueueElement a single queue element. No longer used. - * @deprecated No longer used. - */ - @Deprecated - public static class QueueElement { - private String key; - private long writeTime; - - QueueElement(String key, long writeTime) { - this.key = key; - this.writeTime = writeTime; - } - } - - /** - * Producer-Consumer queue. - */ - public static class Queue extends ZKProducerConsumer { - - /** - * Constructor of producer-consumer queue. - * - * @param address - * The Zookeeper server address. - * @param name - * The name of the root element for the queue. - */ - Queue(String address, String name) { - super(address); - this.setRoot(name); - // Create ZK node name - if (zk != null) { - try { - Stat s = zk.exists(getRoot(), false); - if (s == null) { - zk.create(getRoot(), new byte[0], Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - } - } catch (KeeperException e) { - System.out.println( - "Keeper exception when instantiating queue: " + e.toString()); - } catch (InterruptedException e) { - System.out.println("Interrupted exception"); - } - } - } - - /** - * Producer calls this method to insert the key in the queue. - * - * @param key - * The key to produce (add to the queue). - * @return True if the key was added. - * @throws KeeperException - * On a failure talking to zookeeper. - * @throws InterruptedException - * If the current thread is interrupted waiting for the zookeeper - * acknowledgement. - */ - // - boolean produce(String key) throws KeeperException, InterruptedException { - byte[] value; - value = key.getBytes(); - zk.create(getRoot() + "/key", value, Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT_SEQUENTIAL); - - return true; - } - - /** - * Consumer calls this method to "wait" for the key to the available. - * - * @return The key to consumed (remove from the queue). - * @throws KeeperException - * On a failure talking to zookeeper. - * @throws InterruptedException - * If the current thread is interrupted waiting for the zookeeper - * acknowledgement. - */ - String consume() throws KeeperException, InterruptedException { - String retvalue = null; - Stat stat = null; - - // Get the first element available - while (true) { - synchronized (mutex) { - List<String> list = zk.getChildren(getRoot(), true); - if (list.isEmpty()) { - System.out.println("Going to wait"); - mutex.wait(); - } else { - String path = getRoot() + "/" + list.get(0); - byte[] b = zk.getData(path, false, stat); - retvalue = new String(b); - zk.delete(path, -1); - - return retvalue; - - } - } - } - } - } -} diff --git a/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/package-info.java b/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/package-info.java index 7c85b6ac..e38d200c 100644 --- a/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/package-info.java +++ b/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/package-info.java @@ -1,5 +1,5 @@ -/* - * Copyright (c) 2015, YCSB contributors. All rights reserved. +/** + * Copyright (c) 2015 YCSB contributors. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You @@ -16,7 +16,7 @@ */ /** - * The YCSB binding for <a href="https://accumulo.apache.org/">Apache Accumulo</a>. + * YCSB binding for <a href="https://accumulo.apache.org/">Apache Accumulo</a>. */ package com.yahoo.ycsb.db.accumulo; diff --git a/accumulo/src/main/java/com/yahoo/ycsb/db/package-info.java b/accumulo/src/main/java/com/yahoo/ycsb/db/package-info.java deleted file mode 100644 index fbd7cf85..00000000 --- a/accumulo/src/main/java/com/yahoo/ycsb/db/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Copyright (c) 2015 YCSB contributors. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. You - * may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. See accompanying - * LICENSE file. - */ - -/** - * YCSB binding for <a href="https://accumulo.apache.org/">Accumulo</a>. - */ -package com.yahoo.ycsb.db; \ No newline at end of file -- GitLab