diff --git a/accumulo/README.md b/accumulo/README.md
index e2555833ad1f8bd9e96529f8c057949b373a51be..fd9b4e8d7a3381c4e377669230022b022c4977a3 100644
--- a/accumulo/README.md
+++ b/accumulo/README.md
@@ -80,22 +80,4 @@ Run the workload test:
 - `accumulo.password`
   - The password for the user connecting to Accumulo.
   - No default value.
- 
-- `accumulo.PC_FLAG`
-  - Provides support for distributed clients using ZooKeeper to manage the Producers and Consumers.
-  - If not set then the YCSB client will perform all work locally.
-  - Allowed values are:
-    - `producer`
-    - `consumer`
-    - Not set
-  - Default value is not set.
-
-- `accumulo.PC_SERVER`
-  - The set of ZooKeeper servers to use for the prioducers and consumers to communicate.
-  - Should contain a comma separated list of of hostname or hostname:port values.
-  - No default value. 
-
-- `accumulo.PC_ROOT_IN_ZK`
-  - The root node in the ZooKeepers for the producers and consumers to communicate.
-  - No default value. 
 
diff --git a/accumulo/pom.xml b/accumulo/pom.xml
index 2c69a60db261a423af253d729481f64199d5a108..9aab8fe94c819045d9ae645a6d9c880c3e2387f7 100644
--- a/accumulo/pom.xml
+++ b/accumulo/pom.xml
@@ -27,35 +27,20 @@ LICENSE file.
   </parent>
   <artifactId>accumulo-binding</artifactId>
   <name>Accumulo DB Binding</name>
+  <properties>
+    <!-- This should match up to the one from your Accumulo version -->
+    <hadoop.version>2.2.0</hadoop.version>
+  </properties>
   <dependencies>
     <dependency>
       <groupId>org.apache.accumulo</groupId>
       <artifactId>accumulo-core</artifactId>
       <version>${accumulo.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-common</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.thrift</groupId>
-          <artifactId>thrift</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.zookeeper</groupId>
-          <artifactId>zookeeper</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.zookeeper</groupId>
-      <artifactId>zookeeper</artifactId>
-      <version>3.3.1</version>
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-core</artifactId>
-      <version>0.20.203.0</version>
+      <artifactId>hadoop-common</artifactId>
+      <version>${hadoop.version}</version>
     </dependency>
     <dependency>
       <groupId>com.yahoo.ycsb</groupId>
diff --git a/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/AccumuloClient.java b/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/AccumuloClient.java
index 51dcd5e56b3ee4163ad0be0e22ac1efe4de6f0e3..9f3448d855b0045184c82b34ea56e3cd41cf661b 100644
--- a/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/AccumuloClient.java
+++ b/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/AccumuloClient.java
@@ -1,5 +1,5 @@
 /**
- * Copyright (c) 2011 YCSB++ project, 2014 YCSB contributors.
+ * Copyright (c) 2011 YCSB++ project, 2014-2016 YCSB contributors.
  * All rights reserved.
  *
  * Licensed under the Apache License, Version 2.0 (the "License"); you
@@ -42,16 +42,11 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.CleanUp;
 import org.apache.hadoop.io.Text;
-import org.apache.zookeeper.KeeperException;
 
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Hashtable;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Random;
 import java.util.Set;
-import java.util.TreeSet;
 import java.util.Vector;
 import java.util.concurrent.TimeUnit;
 
@@ -68,14 +63,6 @@ public class AccumuloClient extends DB {
   private Scanner singleScanner = null; // A scanner for reads/deletes.
   private Scanner scanScanner = null; // A scanner for use by scan()
 
-  private static final String PC_PRODUCER = "producer";
-  private static final String PC_CONSUMER = "consumer";
-  private String pcFlag = "";
-  private ZKProducerConsumer.Queue q = null;
-  private static Hashtable<String, Long> hmKeyReads = null;
-  private static Hashtable<String, Integer> hmKeyNumReads = null;
-  private Random r = null;
-
   @Override
   public void init() throws DBException {
     colFam = new Text(getProperties().getProperty("accumulo.columnFamily"));
@@ -94,25 +81,9 @@ public class AccumuloClient extends DB {
       throw new DBException(e);
     }
 
-    pcFlag = getProperties().getProperty("accumulo.PC_FLAG", "none");
-    if (pcFlag.equals(PC_PRODUCER) || pcFlag.equals(PC_CONSUMER)) {
-      System.out.println("*** YCSB Client is " + pcFlag);
-      String address = getProperties().getProperty("accumulo.PC_SERVER");
-      String root = getProperties().getProperty("accumulo.PC_ROOT_IN_ZK");
-      System.out
-          .println("*** PC_INFO(server:" + address + ";root=" + root + ")");
-      q = new ZKProducerConsumer.Queue(address, root);
-      r = new Random();
-    }
-
-    if (pcFlag.equals(PC_CONSUMER)) {
-      hmKeyReads = new Hashtable<String, Long>();
-      hmKeyNumReads = new Hashtable<String, Integer>();
-      try {
-        keyNotification(null);
-      } catch (KeeperException e) {
-        throw new DBException(e);
-      }
+    if (!(getProperties().getProperty("accumulo.pcFlag", "none").equals("none"))) {
+      System.err.println("Sorry, the ZK based producer/consumer implementation has been removed. " +
+          "Please see YCSB issue #416 for work on adding a general solution to coordinated work.");
     }
   }
 
@@ -180,11 +151,9 @@ public class AccumuloClient extends DB {
 
   /**
    * Gets a scanner from Accumulo over one row.
-   * 
-   * @param row
-   *          the row to scan
-   * @param fields
-   *          the set of columns to scan
+   *
+   * @param row the row to scan
+   * @param fields the set of columns to scan
    * @return an Accumulo {@link Scanner} bound to the given row and columns
    */
   private Scanner getRow(Text row, Set<String> fields) {
@@ -244,16 +213,17 @@ public class AccumuloClient extends DB {
 
     // Batch size is how many key/values to try to get per call. Here, I'm
     // guessing that the number of keys in a row is equal to the number of
-    // fields
-    // we're interested in.
+    // fields we're interested in.
+
     // We try to fetch one more so as to tell when we've run out of fields.
 
+    // If no fields are provided, we assume one column/row.
     if (fields != null) {
       // And add each of them as fields we want.
       for (String field : fields) {
         scanScanner.fetchColumn(colFam, new Text(field));
       }
-    } // else - If no fields are provided, we assume one column/row.
+    }
 
     String rowKey = "";
     HashMap<String, ByteIterator> currentHM = null;
@@ -304,22 +274,10 @@ public class AccumuloClient extends DB {
 
     try {
       bw.addMutation(mutInsert);
-      // Distributed YCSB co-ordination: YCSB on a client produces the key
-      // to
-      // be stored in the shared queue in ZooKeeper.
-      if (pcFlag.equals(PC_PRODUCER)) {
-        if (r.nextFloat() < 0.01) {
-          keyNotification(key);
-        }
-      }
     } catch (MutationsRejectedException e) {
       System.err.println("Error performing update.");
       e.printStackTrace();
       return Status.ERROR;
-    } catch (KeeperException e) {
-      System.err.println("Error notifying the Zookeeper Queue.");
-      e.printStackTrace();
-      return Status.ERROR;
     }
 
     return Status.OK;
@@ -378,75 +336,4 @@ public class AccumuloClient extends DB {
 
     bw.addMutation(deleter);
   }
-
-  private void keyNotification(String key) throws KeeperException {
-
-    if (pcFlag.equals(PC_PRODUCER)) {
-      try {
-        q.produce(key);
-      } catch (InterruptedException e) {
-        // Reset the interrupted state.
-        Thread.currentThread().interrupt();
-      }
-    } else {
-      // XXX: do something better to keep the loop going (while??)
-      for (int i = 0; i < 10000000; i++) {
-        try {
-          String strKey = q.consume();
-
-          if (!hmKeyReads.containsKey(strKey)
-              && !hmKeyNumReads.containsKey(strKey)) {
-            hmKeyReads.put(strKey, new Long(System.currentTimeMillis()));
-            hmKeyNumReads.put(strKey, new Integer(1));
-          }
-
-          // YCSB Consumer will read the key that was fetched from the
-          // queue in ZooKeeper.
-          // (current way is kind of ugly but works, i think)
-          // TODO : Get table name from configuration or argument
-          String usertable = "usertable";
-          HashSet<String> fields = new HashSet<String>();
-          for (int j = 0; j < 9; j++) {
-            fields.add("field" + j);
-          }
-          HashMap<String, ByteIterator> result =
-              new HashMap<String, ByteIterator>();
-
-          read(usertable, strKey, fields, result);
-          // If the results are empty, the key is enqueued in
-          // Zookeeper
-          // and tried again, until the results are found.
-          if (result.isEmpty()) {
-            q.produce(strKey);
-            int count = ((Integer) hmKeyNumReads.get(strKey)).intValue();
-            hmKeyNumReads.put(strKey, new Integer(count + 1));
-          } else {
-            if (((Integer) hmKeyNumReads.get(strKey)).intValue() > 1) {
-              long currTime = System.currentTimeMillis();
-              long writeTime = ((Long) hmKeyReads.get(strKey)).longValue();
-              System.out.println(
-                  "Key=" + strKey + ";StartSearch=" + writeTime + ";EndSearch="
-                      + currTime + ";TimeLag=" + (currTime - writeTime));
-            }
-          }
-        } catch (InterruptedException e) {
-          // Reset the interrupted state.
-          Thread.currentThread().interrupt();
-        }
-      }
-    }
-
-  }
-
-  public Status presplit(String t, String[] keys)
-      throws TableNotFoundException, AccumuloException,
-      AccumuloSecurityException {
-    TreeSet<Text> splits = new TreeSet<Text>();
-    for (int i = 0; i < keys.length; i++) {
-      splits.add(new Text(keys[i]));
-    }
-    connector.tableOperations().addSplits(t, splits);   
-    return Status.OK;
-  }
-
 }
diff --git a/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/ZKProducerConsumer.java b/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/ZKProducerConsumer.java
deleted file mode 100644
index 1b478f72b738e45440def94939146babb7984775..0000000000000000000000000000000000000000
--- a/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/ZKProducerConsumer.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/**
- * Copyright (c) 2011 YCSB++ project, 2014 YCSB contributors.
- * All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you
- * may not use this file except in compliance with the License. You
- * may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License. See accompanying
- * LICENSE file.
- */
-
-package com.yahoo.ycsb.db;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.data.Stat;
-
-/**
- * Implementing the PC (Producer/Consumer) Queue in ZooKeeper.
- */
-public class ZKProducerConsumer implements Watcher {
-
-  private static ZooKeeper zk = null;
-  private static Integer mutex;
-
-  private String root;
-
-  /**
-   * Constructor that takes the address of the ZK server.
-   * 
-   * @param address
-   *          The address of the ZK server.
-   */
-  ZKProducerConsumer(String address) {
-    if (zk == null) {
-      try {
-        System.out.println("Starting ZK:");
-        zk = new ZooKeeper(address, 3000, this);
-        mutex = new Integer(-1);
-        System.out.println("Finished starting ZK: " + zk);
-      } catch (IOException e) {
-        System.out.println(e.toString());
-        zk = null;
-      }
-    }
-    // else mutex = new Integer(-1);
-  }
-
-  public synchronized void process(WatchedEvent event) {
-    synchronized (mutex) {
-      // System.out.println("Process: " + event.getType());
-      mutex.notify();
-    }
-  }
-
-  /**
-   * Returns the root.
-   *
-   * @return The root.
-   */
-  protected String getRoot() {
-    return root;
-  }
-
-  /**
-   * Sets the root.
-   * 
-   * @param r
-   *          The root value.
-   */
-  protected void setRoot(String r) {
-    this.root = r;
-  }
-
-  /**
-   * QueueElement a single queue element.  No longer used.
-   * @deprecated No longer used.
-   */
-  @Deprecated
-  public static class QueueElement {
-    private String key;
-    private long writeTime;
-
-    QueueElement(String key, long writeTime) {
-      this.key = key;
-      this.writeTime = writeTime;
-    }
-  }
-
-  /**
-   * Producer-Consumer queue.
-   */
-  public static class Queue extends ZKProducerConsumer {
-
-    /**
-     * Constructor of producer-consumer queue.
-     * 
-     * @param address
-     *          The Zookeeper server address.
-     * @param name
-     *          The name of the root element for the queue.
-     */
-    Queue(String address, String name) {
-      super(address);
-      this.setRoot(name);
-      // Create ZK node name
-      if (zk != null) {
-        try {
-          Stat s = zk.exists(getRoot(), false);
-          if (s == null) {
-            zk.create(getRoot(), new byte[0], Ids.OPEN_ACL_UNSAFE,
-                CreateMode.PERSISTENT);
-          }
-        } catch (KeeperException e) {
-          System.out.println(
-              "Keeper exception when instantiating queue: " + e.toString());
-        } catch (InterruptedException e) {
-          System.out.println("Interrupted exception");
-        }
-      }
-    }
-
-    /**
-     * Producer calls this method to insert the key in the queue.
-     * 
-     * @param key
-     *          The key to produce (add to the queue).
-     * @return True if the key was added.
-     * @throws KeeperException
-     *           On a failure talking to zookeeper.
-     * @throws InterruptedException
-     *           If the current thread is interrupted waiting for the zookeeper
-     *           acknowledgement.
-     */
-    //
-    boolean produce(String key) throws KeeperException, InterruptedException {
-      byte[] value;
-      value = key.getBytes();
-      zk.create(getRoot() + "/key", value, Ids.OPEN_ACL_UNSAFE,
-          CreateMode.PERSISTENT_SEQUENTIAL);
-
-      return true;
-    }
-
-    /**
-     * Consumer calls this method to "wait" for the key to the available.
-     * 
-     * @return The key to consumed (remove from the queue).
-     * @throws KeeperException
-     *           On a failure talking to zookeeper.
-     * @throws InterruptedException
-     *           If the current thread is interrupted waiting for the zookeeper
-     *           acknowledgement.
-     */
-    String consume() throws KeeperException, InterruptedException {
-      String retvalue = null;
-      Stat stat = null;
-
-      // Get the first element available
-      while (true) {
-        synchronized (mutex) {
-          List<String> list = zk.getChildren(getRoot(), true);
-          if (list.isEmpty()) {
-            System.out.println("Going to wait");
-            mutex.wait();
-          } else {
-            String path = getRoot() + "/" + list.get(0);
-            byte[] b = zk.getData(path, false, stat);
-            retvalue = new String(b);
-            zk.delete(path, -1);
-
-            return retvalue;
-
-          }
-        }
-      }
-    }
-  }
-}
diff --git a/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/package-info.java b/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/package-info.java
index 7c85b6ac902f6f08e5921b443f65091b374a9b46..e38d200c774c03138f6ba8f642b2d2fe9bebc578 100644
--- a/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/package-info.java
+++ b/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/package-info.java
@@ -1,5 +1,5 @@
-/*
- * Copyright (c) 2015, YCSB contributors. All rights reserved.
+/**
+ * Copyright (c) 2015 YCSB contributors. All rights reserved.
  *
  * Licensed under the Apache License, Version 2.0 (the "License"); you
  * may not use this file except in compliance with the License. You
@@ -16,7 +16,7 @@
  */
 
 /**
- * The YCSB binding for <a href="https://accumulo.apache.org/">Apache Accumulo</a>.
+ * YCSB binding for <a href="https://accumulo.apache.org/">Apache Accumulo</a>.
  */
 package com.yahoo.ycsb.db.accumulo;
 
diff --git a/accumulo/src/main/java/com/yahoo/ycsb/db/package-info.java b/accumulo/src/main/java/com/yahoo/ycsb/db/package-info.java
deleted file mode 100644
index fbd7cf857468aa2e1dc5532b10ddd927e26432f2..0000000000000000000000000000000000000000
--- a/accumulo/src/main/java/com/yahoo/ycsb/db/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Copyright (c) 2015 YCSB contributors. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you
- * may not use this file except in compliance with the License. You
- * may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License. See accompanying
- * LICENSE file.
- */
-
-/**
- * YCSB binding for <a href="https://accumulo.apache.org/">Accumulo</a>.
- */
-package com.yahoo.ycsb.db;
\ No newline at end of file