diff --git a/accumulo/README.md b/accumulo/README.md
index e2555833ad1f8bd9e96529f8c057949b373a51be..fd9b4e8d7a3381c4e377669230022b022c4977a3 100644
--- a/accumulo/README.md
+++ b/accumulo/README.md
@@ -80,22 +80,4 @@ Run the workload test:
 - `accumulo.password`
   - The password for the user connecting to Accumulo.
   - No default value.
- 
-- `accumulo.PC_FLAG`
-  - Provides support for distributed clients using ZooKeeper to manage the Producers and Consumers.
-  - If not set then the YCSB client will perform all work locally.
-  - Allowed values are:
-    - `producer`
-    - `consumer`
-    - Not set
-  - Default value is not set.
-
-- `accumulo.PC_SERVER`
-  - The set of ZooKeeper servers to use for the prioducers and consumers to communicate.
-  - Should contain a comma separated list of of hostname or hostname:port values.
-  - No default value. 
-
-- `accumulo.PC_ROOT_IN_ZK`
-  - The root node in the ZooKeepers for the producers and consumers to communicate.
-  - No default value. 
 
diff --git a/accumulo/pom.xml b/accumulo/pom.xml
index 2c69a60db261a423af253d729481f64199d5a108..be8b9409150c2af1cbf77e96c5286c1b74e24a47 100644
--- a/accumulo/pom.xml
+++ b/accumulo/pom.xml
@@ -27,35 +27,21 @@ LICENSE file.
   </parent>
   <artifactId>accumulo-binding</artifactId>
   <name>Accumulo DB Binding</name>
+  <properties>
+    <!-- This should match up to the one from your Accumulo version -->
+    <hadoop.version>2.2.0</hadoop.version>
+  </properties>
   <dependencies>
     <dependency>
       <groupId>org.apache.accumulo</groupId>
       <artifactId>accumulo-core</artifactId>
       <version>${accumulo.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-common</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.thrift</groupId>
-          <artifactId>thrift</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.zookeeper</groupId>
-          <artifactId>zookeeper</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.zookeeper</groupId>
-      <artifactId>zookeeper</artifactId>
-      <version>3.3.1</version>
     </dependency>
+    <!-- Needed for hadoop.io.Text :( -->
     <dependency>
       <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-core</artifactId>
-      <version>0.20.203.0</version>
+      <artifactId>hadoop-common</artifactId>
+      <version>${hadoop.version}</version>
     </dependency>
     <dependency>
       <groupId>com.yahoo.ycsb</groupId>
@@ -63,6 +49,25 @@ LICENSE file.
       <version>${project.version}</version>
       <scope>provided</scope>
     </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>4.12</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.accumulo</groupId>
+      <artifactId>accumulo-minicluster</artifactId>
+      <version>${accumulo.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <!-- needed directly only in test, but transitive
+         at runtime for accumulo, hadoop, and thrift. -->
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <version>1.7.13</version>
+    </dependency>
   </dependencies>
   <build>
     <plugins>
@@ -87,5 +92,14 @@ LICENSE file.
         </executions>
       </plugin>
     </plugins>
+    <testResources>
+      <testResource>
+        <directory>../workloads</directory>
+        <targetPath>workloads</targetPath>
+      </testResource>
+      <testResource>
+        <directory>src/test/resources</directory>
+      </testResource>
+    </testResources>
   </build>
 </project>
diff --git a/accumulo/src/main/java/com/yahoo/ycsb/db/ZKProducerConsumer.java b/accumulo/src/main/java/com/yahoo/ycsb/db/ZKProducerConsumer.java
deleted file mode 100644
index 1b478f72b738e45440def94939146babb7984775..0000000000000000000000000000000000000000
--- a/accumulo/src/main/java/com/yahoo/ycsb/db/ZKProducerConsumer.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/**
- * Copyright (c) 2011 YCSB++ project, 2014 YCSB contributors.
- * All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you
- * may not use this file except in compliance with the License. You
- * may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License. See accompanying
- * LICENSE file.
- */
-
-package com.yahoo.ycsb.db;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.data.Stat;
-
-/**
- * Implementing the PC (Producer/Consumer) Queue in ZooKeeper.
- */
-public class ZKProducerConsumer implements Watcher {
-
-  private static ZooKeeper zk = null;
-  private static Integer mutex;
-
-  private String root;
-
-  /**
-   * Constructor that takes the address of the ZK server.
-   * 
-   * @param address
-   *          The address of the ZK server.
-   */
-  ZKProducerConsumer(String address) {
-    if (zk == null) {
-      try {
-        System.out.println("Starting ZK:");
-        zk = new ZooKeeper(address, 3000, this);
-        mutex = new Integer(-1);
-        System.out.println("Finished starting ZK: " + zk);
-      } catch (IOException e) {
-        System.out.println(e.toString());
-        zk = null;
-      }
-    }
-    // else mutex = new Integer(-1);
-  }
-
-  public synchronized void process(WatchedEvent event) {
-    synchronized (mutex) {
-      // System.out.println("Process: " + event.getType());
-      mutex.notify();
-    }
-  }
-
-  /**
-   * Returns the root.
-   *
-   * @return The root.
-   */
-  protected String getRoot() {
-    return root;
-  }
-
-  /**
-   * Sets the root.
-   * 
-   * @param r
-   *          The root value.
-   */
-  protected void setRoot(String r) {
-    this.root = r;
-  }
-
-  /**
-   * QueueElement a single queue element.  No longer used.
-   * @deprecated No longer used.
-   */
-  @Deprecated
-  public static class QueueElement {
-    private String key;
-    private long writeTime;
-
-    QueueElement(String key, long writeTime) {
-      this.key = key;
-      this.writeTime = writeTime;
-    }
-  }
-
-  /**
-   * Producer-Consumer queue.
-   */
-  public static class Queue extends ZKProducerConsumer {
-
-    /**
-     * Constructor of producer-consumer queue.
-     * 
-     * @param address
-     *          The Zookeeper server address.
-     * @param name
-     *          The name of the root element for the queue.
-     */
-    Queue(String address, String name) {
-      super(address);
-      this.setRoot(name);
-      // Create ZK node name
-      if (zk != null) {
-        try {
-          Stat s = zk.exists(getRoot(), false);
-          if (s == null) {
-            zk.create(getRoot(), new byte[0], Ids.OPEN_ACL_UNSAFE,
-                CreateMode.PERSISTENT);
-          }
-        } catch (KeeperException e) {
-          System.out.println(
-              "Keeper exception when instantiating queue: " + e.toString());
-        } catch (InterruptedException e) {
-          System.out.println("Interrupted exception");
-        }
-      }
-    }
-
-    /**
-     * Producer calls this method to insert the key in the queue.
-     * 
-     * @param key
-     *          The key to produce (add to the queue).
-     * @return True if the key was added.
-     * @throws KeeperException
-     *           On a failure talking to zookeeper.
-     * @throws InterruptedException
-     *           If the current thread is interrupted waiting for the zookeeper
-     *           acknowledgement.
-     */
-    //
-    boolean produce(String key) throws KeeperException, InterruptedException {
-      byte[] value;
-      value = key.getBytes();
-      zk.create(getRoot() + "/key", value, Ids.OPEN_ACL_UNSAFE,
-          CreateMode.PERSISTENT_SEQUENTIAL);
-
-      return true;
-    }
-
-    /**
-     * Consumer calls this method to "wait" for the key to the available.
-     * 
-     * @return The key to consumed (remove from the queue).
-     * @throws KeeperException
-     *           On a failure talking to zookeeper.
-     * @throws InterruptedException
-     *           If the current thread is interrupted waiting for the zookeeper
-     *           acknowledgement.
-     */
-    String consume() throws KeeperException, InterruptedException {
-      String retvalue = null;
-      Stat stat = null;
-
-      // Get the first element available
-      while (true) {
-        synchronized (mutex) {
-          List<String> list = zk.getChildren(getRoot(), true);
-          if (list.isEmpty()) {
-            System.out.println("Going to wait");
-            mutex.wait();
-          } else {
-            String path = getRoot() + "/" + list.get(0);
-            byte[] b = zk.getData(path, false, stat);
-            retvalue = new String(b);
-            zk.delete(path, -1);
-
-            return retvalue;
-
-          }
-        }
-      }
-    }
-  }
-}
diff --git a/accumulo/src/main/java/com/yahoo/ycsb/db/AccumuloClient.java b/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/AccumuloClient.java
similarity index 69%
rename from accumulo/src/main/java/com/yahoo/ycsb/db/AccumuloClient.java
rename to accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/AccumuloClient.java
index 6886307cda431eaf2d421780a91a25c07f375bb1..96b869e2b73b8cf9fcbcc5e9c2e3bd21154a73a8 100644
--- a/accumulo/src/main/java/com/yahoo/ycsb/db/AccumuloClient.java
+++ b/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/AccumuloClient.java
@@ -1,5 +1,5 @@
 /**
- * Copyright (c) 2011 YCSB++ project, 2014 YCSB contributors.
+ * Copyright (c) 2011 YCSB++ project, 2014-2016 YCSB contributors.
  * All rights reserved.
  *
  * Licensed under the Apache License, Version 2.0 (the "License"); you
@@ -16,7 +16,7 @@
  * LICENSE file.
  */
 
-package com.yahoo.ycsb.db;
+package com.yahoo.ycsb.db.accumulo;
 
 import com.yahoo.ycsb.ByteArrayByteIterator;
 import com.yahoo.ycsb.ByteIterator;
@@ -42,16 +42,11 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.CleanUp;
 import org.apache.hadoop.io.Text;
-import org.apache.zookeeper.KeeperException;
 
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Hashtable;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Random;
 import java.util.Set;
-import java.util.TreeSet;
 import java.util.Vector;
 import java.util.concurrent.TimeUnit;
 
@@ -68,13 +63,15 @@ public class AccumuloClient extends DB {
   private Scanner singleScanner = null; // A scanner for reads/deletes.
   private Scanner scanScanner = null; // A scanner for use by scan()
 
-  private static final String PC_PRODUCER = "producer";
-  private static final String PC_CONSUMER = "consumer";
-  private String pcFlag = "";
-  private ZKProducerConsumer.Queue q = null;
-  private static Hashtable<String, Long> hmKeyReads = null;
-  private static Hashtable<String, Integer> hmKeyNumReads = null;
-  private Random r = null;
+  static {
+
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+      @Override
+      public void run() {
+        CleanUp.shutdownNow();
+      }
+    });
+  }
 
   @Override
   public void init() throws DBException {
@@ -94,25 +91,9 @@ public class AccumuloClient extends DB {
       throw new DBException(e);
     }
 
-    pcFlag = getProperties().getProperty("accumulo.PC_FLAG", "none");
-    if (pcFlag.equals(PC_PRODUCER) || pcFlag.equals(PC_CONSUMER)) {
-      System.out.println("*** YCSB Client is " + pcFlag);
-      String address = getProperties().getProperty("accumulo.PC_SERVER");
-      String root = getProperties().getProperty("accumulo.PC_ROOT_IN_ZK");
-      System.out
-          .println("*** PC_INFO(server:" + address + ";root=" + root + ")");
-      q = new ZKProducerConsumer.Queue(address, root);
-      r = new Random();
-    }
-
-    if (pcFlag.equals(PC_CONSUMER)) {
-      hmKeyReads = new Hashtable<String, Long>();
-      hmKeyNumReads = new Hashtable<String, Integer>();
-      try {
-        keyNotification(null);
-      } catch (KeeperException e) {
-        throw new DBException(e);
-      }
+    if (!(getProperties().getProperty("accumulo.pcFlag", "none").equals("none"))) {
+      System.err.println("Sorry, the ZK based producer/consumer implementation has been removed. " +
+          "Please see YCSB issue #416 for work on adding a general solution to coordinated work.");
     }
   }
 
@@ -125,7 +106,6 @@ public class AccumuloClient extends DB {
     } catch (MutationsRejectedException e) {
       throw new DBException(e);
     }
-    CleanUp.shutdownNow();
   }
 
   /**
@@ -180,11 +160,9 @@ public class AccumuloClient extends DB {
 
   /**
    * Gets a scanner from Accumulo over one row.
-   * 
-   * @param row
-   *          the row to scan
-   * @param fields
-   *          the set of columns to scan
+   *
+   * @param row the row to scan
+   * @param fields the set of columns to scan
    * @return an Accumulo {@link Scanner} bound to the given row and columns
    */
   private Scanner getRow(Text row, Set<String> fields) {
@@ -244,16 +222,17 @@ public class AccumuloClient extends DB {
 
     // Batch size is how many key/values to try to get per call. Here, I'm
     // guessing that the number of keys in a row is equal to the number of
-    // fields
-    // we're interested in.
+    // fields we're interested in.
+
     // We try to fetch one more so as to tell when we've run out of fields.
 
+    // If no fields are provided, we assume one column/row.
     if (fields != null) {
       // And add each of them as fields we want.
       for (String field : fields) {
         scanScanner.fetchColumn(colFam, new Text(field));
       }
-    } // else - If no fields are provided, we assume one column/row.
+    }
 
     String rowKey = "";
     HashMap<String, ByteIterator> currentHM = null;
@@ -304,22 +283,10 @@ public class AccumuloClient extends DB {
 
     try {
       bw.addMutation(mutInsert);
-      // Distributed YCSB co-ordination: YCSB on a client produces the key
-      // to
-      // be stored in the shared queue in ZooKeeper.
-      if (pcFlag.equals(PC_PRODUCER)) {
-        if (r.nextFloat() < 0.01) {
-          keyNotification(key);
-        }
-      }
     } catch (MutationsRejectedException e) {
       System.err.println("Error performing update.");
       e.printStackTrace();
       return Status.ERROR;
-    } catch (KeeperException e) {
-      System.err.println("Error notifying the Zookeeper Queue.");
-      e.printStackTrace();
-      return Status.ERROR;
     }
 
     return Status.OK;
@@ -378,75 +345,4 @@ public class AccumuloClient extends DB {
 
     bw.addMutation(deleter);
   }
-
-  private void keyNotification(String key) throws KeeperException {
-
-    if (pcFlag.equals(PC_PRODUCER)) {
-      try {
-        q.produce(key);
-      } catch (InterruptedException e) {
-        // Reset the interrupted state.
-        Thread.currentThread().interrupt();
-      }
-    } else {
-      // XXX: do something better to keep the loop going (while??)
-      for (int i = 0; i < 10000000; i++) {
-        try {
-          String strKey = q.consume();
-
-          if (!hmKeyReads.containsKey(strKey)
-              && !hmKeyNumReads.containsKey(strKey)) {
-            hmKeyReads.put(strKey, new Long(System.currentTimeMillis()));
-            hmKeyNumReads.put(strKey, new Integer(1));
-          }
-
-          // YCSB Consumer will read the key that was fetched from the
-          // queue in ZooKeeper.
-          // (current way is kind of ugly but works, i think)
-          // TODO : Get table name from configuration or argument
-          String usertable = "usertable";
-          HashSet<String> fields = new HashSet<String>();
-          for (int j = 0; j < 9; j++) {
-            fields.add("field" + j);
-          }
-          HashMap<String, ByteIterator> result =
-              new HashMap<String, ByteIterator>();
-
-          read(usertable, strKey, fields, result);
-          // If the results are empty, the key is enqueued in
-          // Zookeeper
-          // and tried again, until the results are found.
-          if (result.isEmpty()) {
-            q.produce(strKey);
-            int count = ((Integer) hmKeyNumReads.get(strKey)).intValue();
-            hmKeyNumReads.put(strKey, new Integer(count + 1));
-          } else {
-            if (((Integer) hmKeyNumReads.get(strKey)).intValue() > 1) {
-              long currTime = System.currentTimeMillis();
-              long writeTime = ((Long) hmKeyReads.get(strKey)).longValue();
-              System.out.println(
-                  "Key=" + strKey + ";StartSearch=" + writeTime + ";EndSearch="
-                      + currTime + ";TimeLag=" + (currTime - writeTime));
-            }
-          }
-        } catch (InterruptedException e) {
-          // Reset the interrupted state.
-          Thread.currentThread().interrupt();
-        }
-      }
-    }
-
-  }
-
-  public Status presplit(String t, String[] keys)
-      throws TableNotFoundException, AccumuloException,
-      AccumuloSecurityException {
-    TreeSet<Text> splits = new TreeSet<Text>();
-    for (int i = 0; i < keys.length; i++) {
-      splits.add(new Text(keys[i]));
-    }
-    connector.tableOperations().addSplits(t, splits);   
-    return Status.OK;
-  }
-
 }
diff --git a/accumulo/src/main/java/com/yahoo/ycsb/db/package-info.java b/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/package-info.java
similarity index 85%
rename from accumulo/src/main/java/com/yahoo/ycsb/db/package-info.java
rename to accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/package-info.java
index fbd7cf857468aa2e1dc5532b10ddd927e26432f2..e38d200c774c03138f6ba8f642b2d2fe9bebc578 100644
--- a/accumulo/src/main/java/com/yahoo/ycsb/db/package-info.java
+++ b/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/package-info.java
@@ -16,6 +16,7 @@
  */
 
 /**
- * YCSB binding for <a href="https://accumulo.apache.org/">Accumulo</a>.
+ * YCSB binding for <a href="https://accumulo.apache.org/">Apache Accumulo</a>.
  */
-package com.yahoo.ycsb.db;
\ No newline at end of file
+package com.yahoo.ycsb.db.accumulo;
+
diff --git a/accumulo/src/test/java/com/yahoo/ycsb/db/accumulo/AccumuloTest.java b/accumulo/src/test/java/com/yahoo/ycsb/db/accumulo/AccumuloTest.java
new file mode 100644
index 0000000000000000000000000000000000000000..00fd02e0dd8e66c5910a5c80bd3338f3e04a2182
--- /dev/null
+++ b/accumulo/src/test/java/com/yahoo/ycsb/db/accumulo/AccumuloTest.java
@@ -0,0 +1,209 @@
+/*
+ * Copyright (c) 2016 YCSB contributors.
+ * All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+package com.yahoo.ycsb.db.accumulo;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Map.Entry;
+import java.util.Properties;
+
+import com.yahoo.ycsb.Workload;
+import com.yahoo.ycsb.DB;
+import com.yahoo.ycsb.measurements.Measurements;
+import com.yahoo.ycsb.workloads.CoreWorkload;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.minicluster.MiniAccumuloCluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Use an Accumulo MiniCluster to test out basic workload operations with
+ * the Accumulo binding.
+ */
+public class AccumuloTest {
+  private static final Logger LOG = LoggerFactory.getLogger(AccumuloTest.class);
+  private static final int INSERT_COUNT = 2000;
+  private static final int TRANSACTION_COUNT = 2000;
+
+  @ClassRule
+  public static TemporaryFolder workingDir = new TemporaryFolder();
+  @Rule
+  public TestName test = new TestName();
+
+  private static MiniAccumuloCluster cluster;
+  private static Properties properties;
+  private Workload workload;
+  private DB client;
+  private Properties workloadProps;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    cluster = new MiniAccumuloCluster(workingDir.newFolder("accumulo").getAbsoluteFile(), "protectyaneck");
+    LOG.debug("starting minicluster");
+    cluster.start();
+    LOG.debug("creating connection for admin operations.");
+    // set up the table and user
+    final Connector admin = cluster.getConnector("root", "protectyaneck");
+    admin.tableOperations().create(CoreWorkload.TABLENAME_PROPERTY_DEFAULT);
+    admin.securityOperations().createLocalUser("ycsb", new PasswordToken("protectyaneck"));
+    admin.securityOperations().grantTablePermission("ycsb", CoreWorkload.TABLENAME_PROPERTY_DEFAULT, TablePermission.READ);
+    admin.securityOperations().grantTablePermission("ycsb", CoreWorkload.TABLENAME_PROPERTY_DEFAULT, TablePermission.WRITE);
+
+    // set properties the binding will read
+    properties = new Properties();
+    properties.setProperty("accumulo.zooKeepers", cluster.getZooKeepers());
+    properties.setProperty("accumulo.instanceName", cluster.getInstanceName());
+    properties.setProperty("accumulo.columnFamily", "family");
+    properties.setProperty("accumulo.username", "ycsb");
+    properties.setProperty("accumulo.password", "protectyaneck");
+    // cut down the batch writer timeout so that writes will push through.
+    properties.setProperty("accumulo.batchWriterMaxLatency", "4");
+    // set these explicitly to the defaults at the time we're compiled, since they'll be inlined in our class.
+    properties.setProperty(CoreWorkload.TABLENAME_PROPERTY, CoreWorkload.TABLENAME_PROPERTY_DEFAULT);
+    properties.setProperty(CoreWorkload.FIELD_COUNT_PROPERTY, CoreWorkload.FIELD_COUNT_PROPERTY_DEFAULT);
+    properties.setProperty(CoreWorkload.INSERT_ORDER_PROPERTY, "ordered");
+  }
+
+  @AfterClass
+  public static void clusterCleanup() throws Exception {
+    if (cluster != null) {
+      LOG.debug("shutting down minicluster");
+      cluster.stop();
+      cluster = null;
+    }
+  }
+
+  @Before
+  public void client() throws Exception {
+
+    LOG.debug("Loading workload properties for {}", test.getMethodName());
+    workloadProps = new Properties();
+    workloadProps.load(getClass().getResourceAsStream("/workloads/" + test.getMethodName()));
+
+    for (String prop : properties.stringPropertyNames()) {
+      workloadProps.setProperty(prop, properties.getProperty(prop));
+    }
+
+    // TODO we need a better test rig for 'run this ycsb workload'
+    LOG.debug("initializing measurements and workload");
+    Measurements.setProperties(workloadProps);
+    workload = new CoreWorkload();
+    workload.init(workloadProps);
+
+    LOG.debug("initializing client");
+    client = new AccumuloClient();
+    client.setProperties(workloadProps);
+    client.init();
+  }
+
+  @After
+  public void cleanup() throws Exception {
+    if (client != null) {
+      LOG.debug("cleaning up client");
+      client.cleanup();
+      client = null;
+    }
+    if (workload != null) {
+      LOG.debug("cleaning up workload");
+      workload.cleanup();
+    }
+  }
+
+  @After
+  public void truncateTable() throws Exception {
+    if (cluster != null) {
+      LOG.debug("truncating table {}", CoreWorkload.TABLENAME_PROPERTY_DEFAULT);
+      final Connector admin = cluster.getConnector("root", "protectyaneck");
+      admin.tableOperations().deleteRows(CoreWorkload.TABLENAME_PROPERTY_DEFAULT, null, null);
+    }
+  }
+
+  @Test
+  public void workloada() throws Exception {
+    runWorkload();
+  }
+
+  @Test
+  public void workloadb() throws Exception {
+    runWorkload();
+  }
+
+  @Test
+  public void workloadc() throws Exception {
+    runWorkload();
+  }
+
+  @Test
+  public void workloadd() throws Exception {
+    runWorkload();
+  }
+
+  @Test
+  public void workloade() throws Exception {
+    runWorkload();
+  }
+
+  /**
+   * go through a workload cycle.
+   * <ol>
+   *   <li>initialize thread-specific state
+   *   <li>load the workload dataset
+   *   <li>run workload transactions
+   * </ol>
+   */
+  private void runWorkload() throws Exception {
+    final Object state = workload.initThread(workloadProps,0,0);
+    LOG.debug("load");
+    for (int i = 0; i < INSERT_COUNT; i++) {
+      assertTrue("insert failed.", workload.doInsert(client, state));
+    }
+    // Ensure we wait long enough for the batch writer to flush
+    // TODO accumulo client should be flushing per insert by default.
+    Thread.sleep(2000);
+    LOG.debug("verify number of cells");
+    final Scanner scanner = cluster.getConnector("root", "protectyaneck").createScanner(CoreWorkload.TABLENAME_PROPERTY_DEFAULT, Authorizations.EMPTY);
+    int count = 0;
+    for (Entry<Key, Value> entry : scanner) {
+      count++;
+    }
+    assertEquals("Didn't get enough total cells.", (Integer.valueOf(CoreWorkload.FIELD_COUNT_PROPERTY_DEFAULT) * INSERT_COUNT), count);
+    LOG.debug("run");
+    for (int i = 0; i < TRANSACTION_COUNT; i++) {
+      assertTrue("transaction failed.", workload.doTransaction(client, state));
+    }
+  }
+}
diff --git a/accumulo/src/test/resources/log4j.properties b/accumulo/src/test/resources/log4j.properties
new file mode 100644
index 0000000000000000000000000000000000000000..e03d54a31a8e20a926c453d6057a41d128ed1f44
--- /dev/null
+++ b/accumulo/src/test/resources/log4j.properties
@@ -0,0 +1,29 @@
+#
+# Copyright (c) 2015 YCSB contributors. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you
+# may not use this file except in compliance with the License. You
+# may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied. See the License for the specific language governing
+# permissions and limitations under the License. See accompanying
+# LICENSE file.
+#
+
+# Root logger option
+log4j.rootLogger=INFO, stderr
+
+log4j.appender.stderr=org.apache.log4j.ConsoleAppender
+log4j.appender.stderr.target=System.err
+log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
+log4j.appender.stderr.layout.conversionPattern=%d{yyyy/MM/dd HH:mm:ss} %-5p %c %x - %m%n
+
+# Suppress messages from ZooKeeper
+log4j.logger.com.yahoo.ycsb.db.accumulo=INFO
+log4j.logger.org.apache.zookeeper=ERROR
+log4j.logger.org.apache.accumulo=WARN
diff --git a/bin/ycsb b/bin/ycsb
index 1f0834d1be8bf72b6eae59054350cd5c29ff2c84..90010a7d6118b5f914089dd5a6d0d8b7fed70ccc 100755
--- a/bin/ycsb
+++ b/bin/ycsb
@@ -49,7 +49,7 @@ COMMANDS = {
 }
 
 DATABASES = {
-    "accumulo"     : "com.yahoo.ycsb.db.AccumuloClient",
+    "accumulo"     : "com.yahoo.ycsb.db.accumulo.AccumuloClient",
     "aerospike"    : "com.yahoo.ycsb.db.AerospikeClient",
     "basic"        : "com.yahoo.ycsb.BasicDB",
     "cassandra-7"  : "com.yahoo.ycsb.db.CassandraClient7",