diff --git a/accumulo/README.md b/accumulo/README.md
index fd9b4e8d7a3381c4e377669230022b022c4977a3..38e444cb7c522626d4d12a42a470f62fa83ba6f7 100644
--- a/accumulo/README.md
+++ b/accumulo/README.md
@@ -36,7 +36,42 @@ Git clone YCSB and compile:
     cd YCSB
     mvn -pl com.yahoo.ycsb:aerospike-binding -am clean package
 
-### 3. Load Data and Run Tests
+### 3. Create the Accumulo table
+
+By default, YCSB uses a table with the name "usertable". Users must create this table before loading
+data into Accumulo. For maximum Accumulo performance, the Accumulo table must be pre-split. A simple
+Ruby script, based on the HBase README, can generate adequate split-point. 10's of Tablets per
+TabletServer is a good starting point. Unless otherwise specified, the following commands should run
+on any version of Accumulo.
+
+    $ echo 'num_splits = 20; puts (1..num_splits).map {|i| "user#{1000+i*(9999-1000)/num_splits}"}' | ruby > /tmp/splits.txt
+    $ accumulo shell -u <user> -p <password> -e "createtable usertable"
+    $ accumulo shell -u <user> -p <password> -e "addsplits -t usertable -sf /tmp/splits.txt"
+    $ accumulo shell -u <user> -p <password> -e "config -t usertable -s table.cache.block.enable=true"
+
+Additionally, there are some other configuration properties which can increase performance. These
+can be set on the Accumulo table via the shell after it is created. Setting the table durability
+to `flush` relaxes the constraints on data durability during hard power-outages (avoids calls
+to fsync). Accumulo defaults table compression to `gzip` which is not particularly fast; `snappy`
+is a faster and similarly-efficient option. The mutation queue property controls how many writes
+that Accumulo will buffer in memory before performing a flush; this property should be set relative
+to the amount of JVM heap the TabletServers are given.
+
+Please note that the `table.durability` and `tserver.total.mutation.queue.max` properties only
+exists for >=Accumulo-1.7. There are no concise replacements for these properties in earlier versions.
+
+    accumulo> config -s table.durability=flush
+    accumulo> config -s tserver.total.mutation.queue.max=256M
+    accumulo> config -t usertable -s table.file.compress.type=snappy
+
+On repeated data loads, the following commands may be helpful to re-set the state of the table quickly.
+
+    accumulo> createtable tmp --copy-splits usertable --copy-config usertable
+    accumulo> deletetable --force usertable
+    accumulo> renametable tmp usertable
+    accumulo> compact --wait -t accumulo.metadata
+
+### 4. Load Data and Run Tests
 
 Load the data:
 
diff --git a/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/AccumuloClient.java b/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/AccumuloClient.java
index 96b869e2b73b8cf9fcbcc5e9c2e3bd21154a73a8..41d6f7f6faa5d2ff9b1410fde55a83e6b4744c7d 100644
--- a/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/AccumuloClient.java
+++ b/accumulo/src/main/java/com/yahoo/ycsb/db/accumulo/AccumuloClient.java
@@ -18,17 +18,25 @@
 
 package com.yahoo.ycsb.db.accumulo;
 
-import com.yahoo.ycsb.ByteArrayByteIterator;
-import com.yahoo.ycsb.ByteIterator;
-import com.yahoo.ycsb.DB;
-import com.yahoo.ycsb.DBException;
-import com.yahoo.ycsb.Status;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.Vector;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
@@ -39,16 +47,16 @@ import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.user.WholeRowIterator;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.CleanUp;
 import org.apache.hadoop.io.Text;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.Vector;
-import java.util.concurrent.TimeUnit;
+import com.yahoo.ycsb.ByteArrayByteIterator;
+import com.yahoo.ycsb.ByteIterator;
+import com.yahoo.ycsb.DB;
+import com.yahoo.ycsb.DBException;
+import com.yahoo.ycsb.Status;
 
 /**
  * <a href="https://accumulo.apache.org/">Accumulo</a> binding for YCSB.
@@ -57,14 +65,11 @@ public class AccumuloClient extends DB {
 
   private ZooKeeperInstance inst;
   private Connector connector;
-  private String table = "";
-  private BatchWriter bw = null;
   private Text colFam = new Text("");
-  private Scanner singleScanner = null; // A scanner for reads/deletes.
-  private Scanner scanScanner = null; // A scanner for use by scan()
+  private byte[] colFamBytes = new byte[0];
+  private final ConcurrentHashMap<String, BatchWriter> writers = new ConcurrentHashMap<>();
 
   static {
-
     Runtime.getRuntime().addShutdownHook(new Thread() {
       @Override
       public void run() {
@@ -76,6 +81,7 @@ public class AccumuloClient extends DB {
   @Override
   public void init() throws DBException {
     colFam = new Text(getProperties().getProperty("accumulo.columnFamily"));
+    colFamBytes = colFam.toString().getBytes(UTF_8);
 
     inst = new ZooKeeperInstance(
         getProperties().getProperty("accumulo.instanceName"),
@@ -85,9 +91,7 @@ public class AccumuloClient extends DB {
       AuthenticationToken token =
           new PasswordToken(getProperties().getProperty("accumulo.password"));
       connector = inst.getConnector(principal, token);
-    } catch (AccumuloException e) {
-      throw new DBException(e);
-    } catch (AccumuloSecurityException e) {
+    } catch (AccumuloException | AccumuloSecurityException e) {
       throw new DBException(e);
     }
 
@@ -100,45 +104,56 @@ public class AccumuloClient extends DB {
   @Override
   public void cleanup() throws DBException {
     try {
-      if (bw != null) {
-        bw.close();
+      Iterator<BatchWriter> iterator = writers.values().iterator();
+      while (iterator.hasNext()) {
+        BatchWriter writer = iterator.next();
+        writer.close();
+        iterator.remove();
       }
     } catch (MutationsRejectedException e) {
       throw new DBException(e);
     }
   }
 
-  /**
-   * Commonly repeated functionality: Before doing any operation, make sure
-   * we're working on the correct table. If not, open the correct one.
-   * 
-   * @param t
-   *          The table to open.
-   */
-  public void checkTable(String t) throws TableNotFoundException {
-    if (!table.equals(t)) {
-      getTable(t);
-    }
-  }
-
   /**
    * Called when the user specifies a table that isn't the same as the existing
    * table. Connect to it and if necessary, close our current connection.
    * 
-   * @param t
+   * @param table
    *          The table to open.
    */
-  public void getTable(String t) throws TableNotFoundException {
-    if (bw != null) { // Close the existing writer if necessary.
-      try {
-        bw.close();
-      } catch (MutationsRejectedException e) {
-        // Couldn't spit out the mutations we wanted.
-        // Ignore this for now.
-        System.err.println("MutationsRejectedException: " + e.getMessage());
+  public BatchWriter getWriter(String table) throws TableNotFoundException {
+    // tl;dr We're paying a cost for the ConcurrentHashMap here to deal with the DB api.
+    // We know that YCSB is really only ever going to send us data for one table, so using
+    // a concurrent data structure is overkill (especially in such a hot code path).
+    // However, the impact seems to be relatively negligible in trivial local tests and it's
+    // "more correct" WRT to the API.
+    BatchWriter writer = writers.get(table);
+    if (null == writer) {
+      BatchWriter newWriter = createBatchWriter(table);
+      BatchWriter oldWriter = writers.putIfAbsent(table, newWriter);
+      // Someone beat us to creating a BatchWriter for this table, use their BatchWriters
+      if (null != oldWriter) {
+        try {
+          // Make sure to clean up our new batchwriter!
+          newWriter.close();
+        } catch (MutationsRejectedException e) {
+          throw new RuntimeException(e);
+        }
+        writer = oldWriter;
+      } else {
+        writer = newWriter;
       }
     }
+    return writer;
+  }
 
+  /**
+   * Creates a BatchWriter with the expected configuration.
+   *
+   * @param table The table to write to
+   */
+  private BatchWriter createBatchWriter(String table) throws TableNotFoundException {
     BatchWriterConfig bwc = new BatchWriterConfig();
     bwc.setMaxLatency(
         Long.parseLong(getProperties()
@@ -146,16 +161,15 @@ public class AccumuloClient extends DB {
         TimeUnit.MILLISECONDS);
     bwc.setMaxMemory(Long.parseLong(
         getProperties().getProperty("accumulo.batchWriterSize", "100000")));
-    bwc.setMaxWriteThreads(Integer.parseInt(
-        getProperties().getProperty("accumulo.batchWriterThreads", "1")));
-
-    bw = connector.createBatchWriter(t, bwc);
-
-    // Create our scanners
-    singleScanner = connector.createScanner(t, Authorizations.EMPTY);
-    scanScanner = connector.createScanner(t, Authorizations.EMPTY);
-
-    table = t; // Store the name of the table we have open.
+    final String numThreadsValue = getProperties().getProperty("accumulo.batchWriterThreads");
+    // Try to saturate the client machine.
+    int numThreads = Math.max(1, Runtime.getRuntime().availableProcessors() / 2);
+    if (null != numThreadsValue) {
+      numThreads = Integer.parseInt(numThreadsValue);
+    }
+    System.err.println("Using " + numThreads + " threads to write data");
+    bwc.setMaxWriteThreads(numThreads);
+    return connector.createBatchWriter(table, bwc);
   }
 
   /**
@@ -165,120 +179,120 @@ public class AccumuloClient extends DB {
    * @param fields the set of columns to scan
    * @return an Accumulo {@link Scanner} bound to the given row and columns
    */
-  private Scanner getRow(Text row, Set<String> fields) {
-    singleScanner.clearColumns();
-    singleScanner.setRange(new Range(row));
+  private Scanner getRow(String table, Text row, Set<String> fields) throws TableNotFoundException {
+    Scanner scanner = connector.createScanner(table, Authorizations.EMPTY);
+    scanner.setRange(new Range(row));
     if (fields != null) {
       for (String field : fields) {
-        singleScanner.fetchColumn(colFam, new Text(field));
+        scanner.fetchColumn(colFam, new Text(field));
       }
     }
-    return singleScanner;
+    return scanner;
   }
 
   @Override
-  public Status read(String t, String key, Set<String> fields,
+  public Status read(String table, String key, Set<String> fields,
       HashMap<String, ByteIterator> result) {
 
+    Scanner scanner = null;
     try {
-      checkTable(t);
-    } catch (TableNotFoundException e) {
-      System.err.println("Error trying to connect to Accumulo table." + e);
-      return Status.ERROR;
-    }
-
-    try {
+      scanner = getRow(table, new Text(key), null);
       // Pick out the results we care about.
-      for (Entry<Key, Value> entry : getRow(new Text(key), null)) {
+      final Text cq = new Text();
+      for (Entry<Key, Value> entry : scanner) {
+        entry.getKey().getColumnQualifier(cq);
         Value v = entry.getValue();
         byte[] buf = v.get();
-        result.put(entry.getKey().getColumnQualifier().toString(),
+        result.put(cq.toString(),
             new ByteArrayByteIterator(buf));
       }
     } catch (Exception e) {
-      System.err.println("Error trying to reading Accumulo table" + key + e);
+      System.err.println("Error trying to reading Accumulo table " + table + " " + key);
+      e.printStackTrace();
       return Status.ERROR;
+    } finally {
+      if (null != scanner) {
+        scanner.close();
+      }
     }
     return Status.OK;
 
   }
 
   @Override
-  public Status scan(String t, String startkey, int recordcount,
+  public Status scan(String table, String startkey, int recordcount,
       Set<String> fields, Vector<HashMap<String, ByteIterator>> result) {
-    try {
-      checkTable(t);
-    } catch (TableNotFoundException e) {
-      System.err.println("Error trying to connect to Accumulo table." + e);
-      return Status.ERROR;
-    }
-
-    // There doesn't appear to be a way to create a range for a given
-    // LENGTH. Just start and end keys. So we'll do this the hard way for
-    // now:
     // Just make the end 'infinity' and only read as much as we need.
-    scanScanner.clearColumns();
-    scanScanner.setRange(new Range(new Text(startkey), null));
-
-    // Batch size is how many key/values to try to get per call. Here, I'm
-    // guessing that the number of keys in a row is equal to the number of
-    // fields we're interested in.
-
-    // We try to fetch one more so as to tell when we've run out of fields.
-
-    // If no fields are provided, we assume one column/row.
-    if (fields != null) {
-      // And add each of them as fields we want.
-      for (String field : fields) {
-        scanScanner.fetchColumn(colFam, new Text(field));
+    Scanner scanner = null;
+    try {
+      scanner = connector.createScanner(table, Authorizations.EMPTY);
+      scanner.setRange(new Range(new Text(startkey), null));
+
+      // Have Accumulo send us complete rows, serialized in a single Key-Value pair
+      IteratorSetting cfg = new IteratorSetting(100, WholeRowIterator.class);
+      scanner.addScanIterator(cfg);
+
+      // If no fields are provided, we assume one column/row.
+      if (fields != null) {
+        // And add each of them as fields we want.
+        for (String field : fields) {
+          scanner.fetchColumn(colFam, new Text(field));
+        }
       }
-    }
 
-    String rowKey = "";
-    HashMap<String, ByteIterator> currentHM = null;
-    int count = 0;
-
-    // Begin the iteration.
-    for (Entry<Key, Value> entry : scanScanner) {
-      // Check for a new row.
-      if (!rowKey.equals(entry.getKey().getRow().toString())) {
+      int count = 0;
+      for (Entry<Key, Value> entry : scanner) {
+        // Deserialize the row
+        SortedMap<Key, Value> row = WholeRowIterator.decodeRow(entry.getKey(), entry.getValue());
+        HashMap<String, ByteIterator> rowData;
+        if (null != fields) {
+          rowData = new HashMap<>(fields.size());
+        } else {
+          rowData = new HashMap<>();
+        }
+        result.add(rowData);
+        // Parse the data in the row, avoid unnecessary Text object creation
+        final Text cq = new Text();
+        for (Entry<Key, Value> rowEntry : row.entrySet()) {
+          rowEntry.getKey().getColumnQualifier(cq);
+          rowData.put(cq.toString(), new ByteArrayByteIterator(rowEntry.getValue().get()));
+        }
         if (count++ == recordcount) { // Done reading the last row.
           break;
         }
-        rowKey = entry.getKey().getRow().toString();
-        if (fields != null) {
-          // Initial Capacity for all keys.
-          currentHM = new HashMap<String, ByteIterator>(fields.size());
-        } else {
-          // An empty result map.
-          currentHM = new HashMap<String, ByteIterator>();
-        }
-        result.add(currentHM);
       }
-      // Now add the key to the hashmap.
-      Value v = entry.getValue();
-      byte[] buf = v.get();
-      currentHM.put(entry.getKey().getColumnQualifier().toString(),
-          new ByteArrayByteIterator(buf));
+    } catch (TableNotFoundException e) {
+      System.err.println("Error trying to connect to Accumulo table.");
+      e.printStackTrace();
+      return Status.ERROR;
+    } catch (IOException e) {
+      System.err.println("Error deserializing data from Accumulo.");
+      e.printStackTrace();
+      return Status.ERROR;
+    } finally {
+      if (null != scanner) {
+        scanner.close();
+      }
     }
 
     return Status.OK;
   }
 
   @Override
-  public Status update(String t, String key,
+  public Status update(String table, String key,
       HashMap<String, ByteIterator> values) {
+    BatchWriter bw = null;
     try {
-      checkTable(t);
+      bw = getWriter(table);
     } catch (TableNotFoundException e) {
-      System.err.println("Error trying to connect to Accumulo table." + e);
+      System.err.println("Error opening batch writer to Accumulo table " + table);
+      e.printStackTrace();
       return Status.ERROR;
     }
 
-    Mutation mutInsert = new Mutation(new Text(key));
+    Mutation mutInsert = new Mutation(key.getBytes(UTF_8));
     for (Map.Entry<String, ByteIterator> entry : values.entrySet()) {
-      mutInsert.put(colFam, new Text(entry.getKey()),
-          System.currentTimeMillis(), new Value(entry.getValue().toArray()));
+      mutInsert.put(colFamBytes, entry.getKey().getBytes(UTF_8), entry.getValue().toArray());
     }
 
     try {
@@ -289,7 +303,7 @@ public class AccumuloClient extends DB {
       return Status.ERROR;
     }
 
-    return Status.OK;
+    return Status.BATCHED_OK;
   }
 
   @Override
@@ -299,17 +313,19 @@ public class AccumuloClient extends DB {
   }
 
   @Override
-  public Status delete(String t, String key) {
+  public Status delete(String table, String key) {
+    BatchWriter bw;
     try {
-      checkTable(t);
+      bw = getWriter(table);
     } catch (TableNotFoundException e) {
-      System.err.println("Error trying to connect to Accumulo table." + e);
+      System.err.println("Error trying to connect to Accumulo table.");
+      e.printStackTrace();
       return Status.ERROR;
     }
 
     try {
-      deleteRow(new Text(key));
-    } catch (MutationsRejectedException e) {
+      deleteRow(table, new Text(key), bw);
+    } catch (TableNotFoundException | MutationsRejectedException e) {
       System.err.println("Error performing delete.");
       e.printStackTrace();
       return Status.ERROR;
@@ -323,24 +339,31 @@ public class AccumuloClient extends DB {
   }
 
   // These functions are adapted from RowOperations.java:
-  private void deleteRow(Text row) throws MutationsRejectedException {
-    deleteRow(getRow(row, null));
+  private void deleteRow(String table, Text row, BatchWriter bw) throws MutationsRejectedException,
+          TableNotFoundException {
+    // TODO Use a batchDeleter instead
+    deleteRow(getRow(table, row, null), bw);
   }
 
   /**
    * Deletes a row, given a Scanner of JUST that row.
    */
-  private void deleteRow(Scanner scanner) throws MutationsRejectedException {
+  private void deleteRow(Scanner scanner, BatchWriter bw) throws MutationsRejectedException {
     Mutation deleter = null;
     // iterate through the keys
+    final Text row = new Text();
+    final Text cf = new Text();
+    final Text cq = new Text();
     for (Entry<Key, Value> entry : scanner) {
       // create a mutation for the row
       if (deleter == null) {
-        deleter = new Mutation(entry.getKey().getRow());
+        entry.getKey().getRow(row);
+        deleter = new Mutation(row);
       }
+      entry.getKey().getColumnFamily(cf);
+      entry.getKey().getColumnQualifier(cq);
       // the remove function adds the key with the delete flag set to true
-      deleter.putDelete(entry.getKey().getColumnFamily(),
-          entry.getKey().getColumnQualifier());
+      deleter.putDelete(cf, cq);
     }
 
     bw.addMutation(deleter);