diff --git a/core/src/main/java/com/yahoo/ycsb/DBWrapper.java b/core/src/main/java/com/yahoo/ycsb/DBWrapper.java
index 0109c519558056ab0d762b64b2350dda6fa16b7c..f68ca76f5aa0c0d096098619853268398040906c 100644
--- a/core/src/main/java/com/yahoo/ycsb/DBWrapper.java
+++ b/core/src/main/java/com/yahoo/ycsb/DBWrapper.java
@@ -1,5 +1,5 @@
 /**
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * Copyright (c) 2010 Yahoo! Inc., 2016 YCSB contributors. All rights reserved.
  *
  * Licensed under the Apache License, Version 2.0 (the "License"); you
  * may not use this file except in compliance with the License. You
@@ -184,7 +184,7 @@ public class DBWrapper extends DB
   private void measure(String op, Status result, long intendedStartTimeNanos,
       long startTimeNanos, long endTimeNanos) {
     String measurementName = op;
-    if (result != Status.OK) {
+    if (result == null || !result.isOk()) {
       if (this.reportLatencyForEachError ||
           this.latencyTrackedErrors.contains(result.getName())) {
         measurementName = op + "-" + result.getName();
diff --git a/core/src/main/java/com/yahoo/ycsb/Status.java b/core/src/main/java/com/yahoo/ycsb/Status.java
index 5e5b3a887d3f3bb6c871c950977ab2585c3fc28f..5dbf88ac8070a49a9817685595aefb680d05eb35 100644
--- a/core/src/main/java/com/yahoo/ycsb/Status.java
+++ b/core/src/main/java/com/yahoo/ycsb/Status.java
@@ -1,5 +1,5 @@
 /**
- * Copyright (c) 2015 YCSB contributors All rights reserved.
+ * Copyright (c) 2016 YCSB contributors All rights reserved.
  *
  * Licensed under the Apache License, Version 2.0 (the "License"); you
  * may not use this file except in compliance with the License. You
@@ -79,6 +79,14 @@ public class Status {
     return true;
   }
 
+  /**
+   * Is {@code this} a passing state for the operation: {@link Status#OK} or {@link Status#BATCHED_OK}.
+   * @return true if the operation is successful, false otherwise
+   */
+  public boolean isOk() {
+    return this == OK || this == BATCHED_OK;
+  }
+
   public static final Status OK = new Status("OK", "The operation completed successfully.");
   public static final Status ERROR = new Status("ERROR", "The operation failed.");
   public static final Status NOT_FOUND = new Status("NOT_FOUND", "The requested record was not found.");
diff --git a/core/src/main/java/com/yahoo/ycsb/workloads/CoreWorkload.java b/core/src/main/java/com/yahoo/ycsb/workloads/CoreWorkload.java
index b9ff7e734cf3b3224511de51e4f1b16253f00ff7..422dc05475d1534a717ab63077c7a6525ce52c5d 100644
--- a/core/src/main/java/com/yahoo/ycsb/workloads/CoreWorkload.java
+++ b/core/src/main/java/com/yahoo/ycsb/workloads/CoreWorkload.java
@@ -41,6 +41,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Vector;
 
 
@@ -590,7 +591,7 @@ public class CoreWorkload extends Workload {
     int numOfRetries = 0;
     do {
       status = db.insert(table, dbkey, values);
-      if (status == Status.OK) {
+      if (null != status && status.isOk()) {
         break;
       }
       // Retry if configured. Without retrying, the load process will fail
@@ -614,7 +615,7 @@ public class CoreWorkload extends Workload {
       }
     } while (true);
 
-    return (status == Status.OK);
+    return null != status && status.isOk();
   }
 
   /**
diff --git a/core/src/test/java/com/yahoo/ycsb/TestStatus.java b/core/src/test/java/com/yahoo/ycsb/TestStatus.java
new file mode 100644
index 0000000000000000000000000000000000000000..efcc7239c86a3afdc87d5ab279c1894e06b46311
--- /dev/null
+++ b/core/src/test/java/com/yahoo/ycsb/TestStatus.java
@@ -0,0 +1,41 @@
+/**
+ * Copyright (c) 2016 YCSB contributors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+package com.yahoo.ycsb;
+
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+/**
+ * Test class for {@link Status}.
+ */
+public class TestStatus {
+
+  @Test
+  public void testAcceptableStatus() {
+    assertTrue(Status.OK.isOk());
+    assertTrue(Status.BATCHED_OK.isOk());
+    assertFalse(Status.BAD_REQUEST.isOk());
+    assertFalse(Status.ERROR.isOk());
+    assertFalse(Status.FORBIDDEN.isOk());
+    assertFalse(Status.NOT_FOUND.isOk());
+    assertFalse(Status.NOT_IMPLEMENTED.isOk());
+    assertFalse(Status.SERVICE_UNAVAILABLE.isOk());
+    assertFalse(Status.UNEXPECTED_STATE.isOk());
+  }
+}
diff --git a/jdbc/README.md b/jdbc/README.md
index f74ff54f019a1d576205463ef97c4b4a7cf12688..b6e10926fb555ff25103f2296e9b1292417b4f7c 100644
--- a/jdbc/README.md
+++ b/jdbc/README.md
@@ -101,6 +101,8 @@ db.passwd=admin								# Password for the connection.
 db.batchsize=1000             # The batch size for doing batched inserts. Defaults to 0. Set to >0 to use batching.
 jdbc.fetchsize=10							# The JDBC fetch size hinted to the driver.
 jdbc.autocommit=true						# The JDBC connection auto-commit property for the driver.
+jdbc.batchupdateapi=false     # Use addBatch()/executeBatch() JDBC methods instead of executeUpdate() for writes (default: false)
+db.batchsize=1000             # The number of rows to be batched before commit (or executeBatch() when jdbc.batchupdateapi=true)
 ```
 
 Please refer to https://github.com/brianfrankcooper/YCSB/wiki/Core-Properties for all other YCSB core properties.
diff --git a/jdbc/src/main/java/com/yahoo/ycsb/db/JdbcDBClient.java b/jdbc/src/main/java/com/yahoo/ycsb/db/JdbcDBClient.java
index 80dd338ef221361bb918c44e21910ec934bd13bd..aeaf431fdcc3afef665b8bbee7d4a57242c2db70 100644
--- a/jdbc/src/main/java/com/yahoo/ycsb/db/JdbcDBClient.java
+++ b/jdbc/src/main/java/com/yahoo/ycsb/db/JdbcDBClient.java
@@ -1,5 +1,5 @@
 /**
- * Copyright (c) 2010 - 2016 Yahoo! Inc. All rights reserved.
+ * Copyright (c) 2010 - 2016 Yahoo! Inc., 2016 YCSB contributors. All rights reserved.
  *
  * Licensed under the Apache License, Version 2.0 (the "License"); you
  * may not use this file except in compliance with the License. You
@@ -26,6 +26,7 @@ import java.sql.*;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import com.yahoo.ycsb.db.flavors.DBFlavor;
 
 /**
  * A class that wraps a JDBC compliant database to allow it to be interfaced
@@ -64,6 +65,8 @@ public class JdbcDBClient extends DB {
   /** The JDBC connection auto-commit property for the driver. */
   public static final String JDBC_AUTO_COMMIT = "jdbc.autocommit";
 
+  public static final String JDBC_BATCH_UPDATES = "jdbc.batchupdateapi";
+
   /** The name of the property for the number of fields in a record. */
   public static final String FIELD_COUNT_PROPERTY = "fieldcount";
 
@@ -84,9 +87,14 @@ public class JdbcDBClient extends DB {
   private Properties props;
   private int jdbcFetchSize;
   private int batchSize;
+  private boolean autoCommit;
+  private boolean batchUpdates;
   private static final String DEFAULT_PROP = "";
   private ConcurrentMap<StatementType, PreparedStatement> cachedStatements;
   private long numRowsInBatch = 0;
+  /** DB flavor defines DB-specific syntax and behavior for the
+   * particular database. Current database flavors are: {default, phoenix} */
+  private DBFlavor dbFlavor;
 
   /**
    * Ordered field information for insert and update statements.
@@ -109,87 +117,6 @@ public class JdbcDBClient extends DB {
     }
   }
 
-  /**
-   * The statement type for the prepared statements.
-   */
-  private static class StatementType {
-
-    enum Type {
-      INSERT(1), DELETE(2), READ(3), UPDATE(4), SCAN(5);
-
-      private final int internalType;
-
-      private Type(int type) {
-        internalType = type;
-      }
-
-      int getHashCode() {
-        final int prime = 31;
-        int result = 1;
-        result = prime * result + internalType;
-        return result;
-      }
-    }
-
-    private Type type;
-    private int shardIndex;
-    private int numFields;
-    private String tableName;
-    private String fieldString;
-
-    StatementType(Type type, String tableName, int numFields, String fieldString, int shardIndex) {
-      this.type = type;
-      this.tableName = tableName;
-      this.numFields = numFields;
-      this.fieldString = fieldString;
-      this.shardIndex = shardIndex;
-    }
-
-    @Override
-    public int hashCode() {
-      final int prime = 31;
-      int result = 1;
-      result = prime * result + numFields + 100 * shardIndex;
-      result = prime * result + ((tableName == null) ? 0 : tableName.hashCode());
-      result = prime * result + ((type == null) ? 0 : type.getHashCode());
-      return result;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (this == obj) {
-        return true;
-      }
-      if (obj == null) {
-        return false;
-      }
-      if (getClass() != obj.getClass()) {
-        return false;
-      }
-      StatementType other = (StatementType) obj;
-      if (numFields != other.numFields) {
-        return false;
-      }
-      if (shardIndex != other.shardIndex) {
-        return false;
-      }
-      if (tableName == null) {
-        if (other.tableName != null) {
-          return false;
-        }
-      } else if (!tableName.equals(other.tableName)) {
-        return false;
-      }
-      if (type != other.type) {
-        return false;
-      }
-      if (!fieldString.equals(other.fieldString)) {
-        return false;
-      }
-      return true;
-    }
-  }
-
   /**
    * For the given key, returns what shard contains data for this key.
    *
@@ -214,6 +141,9 @@ public class JdbcDBClient extends DB {
 
   private void cleanupAllConnections() throws SQLException {
     for (Connection conn : conns) {
+      if (!autoCommit) {
+        conn.commit();
+      }
       conn.close();
     }
   }
@@ -232,6 +162,15 @@ public class JdbcDBClient extends DB {
     return -1;
   }
 
+  /** Returns parsed boolean value from the properties if set, otherwise returns defaultVal. */
+  private static boolean getBoolProperty(Properties props, String key, boolean defaultVal) {
+    String valueStr = props.getProperty(key);
+    if (valueStr != null) {
+      return Boolean.parseBoolean(valueStr);
+    }
+    return defaultVal;
+  }
+
   @Override
   public void init() throws DBException {
     if (initialized) {
@@ -247,8 +186,8 @@ public class JdbcDBClient extends DB {
     this.jdbcFetchSize = getIntProperty(props, JDBC_FETCH_SIZE);
     this.batchSize = getIntProperty(props, DB_BATCH_SIZE);
 
-    String autoCommitStr = props.getProperty(JDBC_AUTO_COMMIT, Boolean.TRUE.toString());
-    Boolean autoCommit = Boolean.parseBoolean(autoCommitStr);
+    this.autoCommit = getBoolProperty(props, JDBC_AUTO_COMMIT, true);
+    this.batchUpdates = getBoolProperty(props, JDBC_BATCH_UPDATES, false);
 
     try {
       if (driver != null) {
@@ -256,7 +195,8 @@ public class JdbcDBClient extends DB {
       }
       int shardCount = 0;
       conns = new ArrayList<Connection>(3);
-      for (String url : urls.split(",")) {
+      final String[] urlArr = urls.split(",");
+      for (String url : urlArr) {
         System.out.println("Adding shard node URL: " + url);
         Connection conn = DriverManager.getConnection(url, user, passwd);
 
@@ -273,6 +213,8 @@ public class JdbcDBClient extends DB {
       System.out.println("Using shards: " + shardCount + ", batchSize:" + batchSize + ", fetchSize: " + jdbcFetchSize);
 
       cachedStatements = new ConcurrentHashMap<StatementType, PreparedStatement>();
+
+      this.dbFlavor = DBFlavor.fromJdbcUrl(urlArr[0]);
     } catch (ClassNotFoundException e) {
       System.err.println("Error in initializing the JDBS driver: " + e);
       throw new DBException(e);
@@ -283,6 +225,7 @@ public class JdbcDBClient extends DB {
       System.err.println("Invalid value for fieldcount property. " + e);
       throw new DBException(e);
     }
+
     initialized = true;
   }
 
@@ -310,16 +253,10 @@ public class JdbcDBClient extends DB {
     }
   }
 
-  private PreparedStatement createAndCacheInsertStatement(StatementType insertType, String key) throws SQLException {
-    StringBuilder insert = new StringBuilder("INSERT INTO ");
-    insert.append(insertType.tableName);
-    insert.append(" (" + PRIMARY_KEY + "," + insertType.fieldString + ")");
-    insert.append(" VALUES(?");
-    for (int i = 0; i < insertType.numFields; i++) {
-      insert.append(",?");
-    }
-    insert.append(")");
-    PreparedStatement insertStatement = getShardConnectionByKey(key).prepareStatement(insert.toString());
+  private PreparedStatement createAndCacheInsertStatement(StatementType insertType, String key)
+      throws SQLException {
+    String insert = dbFlavor.createInsertStatement(insertType, key);
+    PreparedStatement insertStatement = getShardConnectionByKey(key).prepareStatement(insert);
     PreparedStatement stmt = cachedStatements.putIfAbsent(insertType, insertStatement);
     if (stmt == null) {
       return insertStatement;
@@ -327,14 +264,10 @@ public class JdbcDBClient extends DB {
     return stmt;
   }
 
-  private PreparedStatement createAndCacheReadStatement(StatementType readType, String key) throws SQLException {
-    StringBuilder read = new StringBuilder("SELECT * FROM ");
-    read.append(readType.tableName);
-    read.append(" WHERE ");
-    read.append(PRIMARY_KEY);
-    read.append(" = ");
-    read.append("?");
-    PreparedStatement readStatement = getShardConnectionByKey(key).prepareStatement(read.toString());
+  private PreparedStatement createAndCacheReadStatement(StatementType readType, String key)
+      throws SQLException {
+    String read = dbFlavor.createReadStatement(readType, key);
+    PreparedStatement readStatement = getShardConnectionByKey(key).prepareStatement(read);
     PreparedStatement stmt = cachedStatements.putIfAbsent(readType, readStatement);
     if (stmt == null) {
       return readStatement;
@@ -342,13 +275,10 @@ public class JdbcDBClient extends DB {
     return stmt;
   }
 
-  private PreparedStatement createAndCacheDeleteStatement(StatementType deleteType, String key) throws SQLException {
-    StringBuilder delete = new StringBuilder("DELETE FROM ");
-    delete.append(deleteType.tableName);
-    delete.append(" WHERE ");
-    delete.append(PRIMARY_KEY);
-    delete.append(" = ?");
-    PreparedStatement deleteStatement = getShardConnectionByKey(key).prepareStatement(delete.toString());
+  private PreparedStatement createAndCacheDeleteStatement(StatementType deleteType, String key)
+      throws SQLException {
+    String delete = dbFlavor.createDeleteStatement(deleteType, key);
+    PreparedStatement deleteStatement = getShardConnectionByKey(key).prepareStatement(delete);
     PreparedStatement stmt = cachedStatements.putIfAbsent(deleteType, deleteStatement);
     if (stmt == null) {
       return deleteStatement;
@@ -356,22 +286,10 @@ public class JdbcDBClient extends DB {
     return stmt;
   }
 
-  private PreparedStatement createAndCacheUpdateStatement(StatementType updateType, String key) throws SQLException {
-    String[] fieldKeys = updateType.fieldString.split(",");
-    StringBuilder update = new StringBuilder("UPDATE ");
-    update.append(updateType.tableName);
-    update.append(" SET ");
-    for (int i = 0; i < fieldKeys.length; i++) {
-      update.append(fieldKeys[i]);
-      update.append("=?");
-      if (i < fieldKeys.length - 1) {
-        update.append(", ");
-      }
-    }
-    update.append(" WHERE ");
-    update.append(PRIMARY_KEY);
-    update.append(" = ?");
-    PreparedStatement insertStatement = getShardConnectionByKey(key).prepareStatement(update.toString());
+  private PreparedStatement createAndCacheUpdateStatement(StatementType updateType, String key)
+      throws SQLException {
+    String update = dbFlavor.createUpdateStatement(updateType, key);
+    PreparedStatement insertStatement = getShardConnectionByKey(key).prepareStatement(update);
     PreparedStatement stmt = cachedStatements.putIfAbsent(updateType, insertStatement);
     if (stmt == null) {
       return insertStatement;
@@ -379,16 +297,10 @@ public class JdbcDBClient extends DB {
     return stmt;
   }
 
-  private PreparedStatement createAndCacheScanStatement(StatementType scanType, String key) throws SQLException {
-    StringBuilder select = new StringBuilder("SELECT * FROM ");
-    select.append(scanType.tableName);
-    select.append(" WHERE ");
-    select.append(PRIMARY_KEY);
-    select.append(" >= ?");
-    select.append(" ORDER BY ");
-    select.append(PRIMARY_KEY);
-    select.append(" LIMIT ?");
-    PreparedStatement scanStatement = getShardConnectionByKey(key).prepareStatement(select.toString());
+  private PreparedStatement createAndCacheScanStatement(StatementType scanType, String key)
+      throws SQLException {
+    String select = dbFlavor.createScanStatement(scanType, key);
+    PreparedStatement scanStatement = getShardConnectionByKey(key).prepareStatement(select);
     if (this.jdbcFetchSize > 0) {
       scanStatement.setFetchSize(this.jdbcFetchSize);
     }
@@ -500,24 +412,49 @@ public class JdbcDBClient extends DB {
       for (String value: fieldInfo.getFieldValues()) {
         insertStatement.setString(index++, value);
       }
-      int result;
-      if (batchSize > 0) {
+      // Using the batch insert API
+      if (batchUpdates) {
         insertStatement.addBatch();
-        if (++numRowsInBatch % batchSize == 0) {
-          int[] results = insertStatement.executeBatch();
-          for (int r : results) {
-            if (r != 1) {
-              return Status.ERROR;
+        // Check for a sane batch size
+        if (batchSize > 0) {
+          // Commit the batch after it grows beyond the configured size
+          if (++numRowsInBatch % batchSize == 0) {
+            int[] results = insertStatement.executeBatch();
+            for (int r : results) {
+              if (r != 1) {
+                return Status.ERROR;
+              }
+            }
+            // If autoCommit is off, make sure we commit the batch
+            if (!autoCommit) {
+              getShardConnectionByKey(key).commit();
+            }
+            return Status.OK;
+          } // else, the default value of -1 or a nonsense. Treat it as an infinitely large batch.
+        } // else, we let the batch accumulate
+        // Added element to the batch, potentially committing the batch too.
+        return Status.BATCHED_OK;
+      } else {
+        // Normal update
+        int result = insertStatement.executeUpdate();
+        // If we are not autoCommit, we might have to commit now
+        if (!autoCommit) {
+          // Let updates be batcher locally
+          if (batchSize > 0) {
+            if (++numRowsInBatch % batchSize == 0) {
+              // Send the batch of updates
+              getShardConnectionByKey(key).commit();
             }
+            // uhh
+            return Status.OK;
+          } else {
+            // Commit each update
+            getShardConnectionByKey(key).commit();
           }
+        }
+        if (result == 1) {
           return Status.OK;
         }
-        return Status.BATCHED_OK;
-      } else {
-        result = insertStatement.executeUpdate();
-      }
-      if (result == 1) {
-        return Status.OK;
       }
       return Status.UNEXPECTED_STATE;
     } catch (SQLException e) {
@@ -548,7 +485,7 @@ public class JdbcDBClient extends DB {
 
   private OrderedFieldInfo getFieldInfo(HashMap<String, ByteIterator> values) {
     String fieldKeys = "";
-    List<String> fieldValues = new ArrayList();
+    List<String> fieldValues = new ArrayList<>();
     int count = 0;
     for (Map.Entry<String, ByteIterator> entry : values.entrySet()) {
       fieldKeys += entry.getKey();
diff --git a/jdbc/src/main/java/com/yahoo/ycsb/db/StatementType.java b/jdbc/src/main/java/com/yahoo/ycsb/db/StatementType.java
new file mode 100644
index 0000000000000000000000000000000000000000..e20d078ea8835393daeb42f1131b1e7bb9833966
--- /dev/null
+++ b/jdbc/src/main/java/com/yahoo/ycsb/db/StatementType.java
@@ -0,0 +1,110 @@
+/**
+ * Copyright (c) 2010 Yahoo! Inc., 2016 YCSB contributors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+package com.yahoo.ycsb.db;
+
+/**
+ * The statement type for the prepared statements.
+ */
+public class StatementType {
+
+  enum Type {
+    INSERT(1), DELETE(2), READ(3), UPDATE(4), SCAN(5);
+
+    private final int internalType;
+
+    private Type(int type) {
+      internalType = type;
+    }
+
+    int getHashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + internalType;
+      return result;
+    }
+  }
+
+  private Type type;
+  private int shardIndex;
+  private int numFields;
+  private String tableName;
+  private String fieldString;
+
+  public StatementType(Type type, String tableName, int numFields, String fieldString, int shardIndex) {
+    this.type = type;
+    this.tableName = tableName;
+    this.numFields = numFields;
+    this.fieldString = fieldString;
+    this.shardIndex = shardIndex;
+  }
+
+  public String getTableName() {
+    return tableName;
+  }
+
+  public String getFieldString() {
+    return fieldString;
+  }
+
+  public int getNumFields() {
+    return numFields;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + numFields + 100 * shardIndex;
+    result = prime * result + ((tableName == null) ? 0 : tableName.hashCode());
+    result = prime * result + ((type == null) ? 0 : type.getHashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    StatementType other = (StatementType) obj;
+    if (numFields != other.numFields) {
+      return false;
+    }
+    if (shardIndex != other.shardIndex) {
+      return false;
+    }
+    if (tableName == null) {
+      if (other.tableName != null) {
+        return false;
+      }
+    } else if (!tableName.equals(other.tableName)) {
+      return false;
+    }
+    if (type != other.type) {
+      return false;
+    }
+    if (!fieldString.equals(other.fieldString)) {
+      return false;
+    }
+    return true;
+  }
+}
diff --git a/jdbc/src/main/java/com/yahoo/ycsb/db/flavors/DBFlavor.java b/jdbc/src/main/java/com/yahoo/ycsb/db/flavors/DBFlavor.java
new file mode 100644
index 0000000000000000000000000000000000000000..41ccb221588d448045aa693dbb6157f78ab29122
--- /dev/null
+++ b/jdbc/src/main/java/com/yahoo/ycsb/db/flavors/DBFlavor.java
@@ -0,0 +1,69 @@
+/**
+ * Copyright (c) 2016 YCSB contributors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+package com.yahoo.ycsb.db.flavors;
+
+import com.yahoo.ycsb.db.StatementType;
+
+/**
+ * DBFlavor captures minor differences in syntax and behavior among JDBC implementations and SQL
+ * dialects. This class also acts as a factory to instantiate concrete flavors based on the JDBC URL.
+ */
+public abstract class DBFlavor {
+
+  enum DBName {
+    DEFAULT,
+    PHOENIX
+  }
+
+  private final DBName dbName;
+
+  public DBFlavor(DBName dbName) {
+    this.dbName = dbName;
+  }
+
+  public static DBFlavor fromJdbcUrl(String url) {
+    if (url.startsWith("jdbc:phoenix")) {
+      return new PhoenixDBFlavor();
+    }
+    return new DefaultDBFlavor();
+  }
+
+  /**
+   * Create and return a SQL statement for inserting data.
+   */
+  public abstract String createInsertStatement(StatementType insertType, String key);
+
+  /**
+   * Create and return a SQL statement for reading data.
+   */
+  public abstract String createReadStatement(StatementType readType, String key);
+
+  /**
+   * Create and return a SQL statement for deleting data.
+   */
+  public abstract String createDeleteStatement(StatementType deleteType, String key);
+
+  /**
+   * Create and return a SQL statement for updating data.
+   */
+  public abstract String createUpdateStatement(StatementType updateType, String key);
+
+  /**
+   * Create and return a SQL statement for scanning data.
+   */
+  public abstract String createScanStatement(StatementType scanType, String key);
+}
diff --git a/jdbc/src/main/java/com/yahoo/ycsb/db/flavors/DefaultDBFlavor.java b/jdbc/src/main/java/com/yahoo/ycsb/db/flavors/DefaultDBFlavor.java
new file mode 100644
index 0000000000000000000000000000000000000000..3af4829725734743a2650c934051569a420aea8c
--- /dev/null
+++ b/jdbc/src/main/java/com/yahoo/ycsb/db/flavors/DefaultDBFlavor.java
@@ -0,0 +1,98 @@
+/**
+ * Copyright (c) 2016 YCSB contributors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+package com.yahoo.ycsb.db.flavors;
+
+import com.yahoo.ycsb.db.JdbcDBClient;
+import com.yahoo.ycsb.db.StatementType;
+
+/**
+ * A default flavor for relational databases.
+ */
+public class DefaultDBFlavor extends DBFlavor {
+  public DefaultDBFlavor() {
+    super(DBName.DEFAULT);
+  }
+  public DefaultDBFlavor(DBName dbName) {
+    super(dbName);
+  }
+
+  @Override
+  public String createInsertStatement(StatementType insertType, String key) {
+    StringBuilder insert = new StringBuilder("INSERT INTO ");
+    insert.append(insertType.getTableName());
+    insert.append(" (" + JdbcDBClient.PRIMARY_KEY + "," + insertType.getFieldString() + ")");
+    insert.append(" VALUES(?");
+    for (int i = 0; i < insertType.getNumFields(); i++) {
+      insert.append(",?");
+    }
+    insert.append(")");
+    return insert.toString();
+  }
+
+  @Override
+  public String createReadStatement(StatementType readType, String key) {
+    StringBuilder read = new StringBuilder("SELECT * FROM ");
+    read.append(readType.getTableName());
+    read.append(" WHERE ");
+    read.append(JdbcDBClient.PRIMARY_KEY);
+    read.append(" = ");
+    read.append("?");
+    return read.toString();
+  }
+
+  @Override
+  public String createDeleteStatement(StatementType deleteType, String key) {
+    StringBuilder delete = new StringBuilder("DELETE FROM ");
+    delete.append(deleteType.getTableName());
+    delete.append(" WHERE ");
+    delete.append(JdbcDBClient.PRIMARY_KEY);
+    delete.append(" = ?");
+    return delete.toString();
+  }
+
+  @Override
+  public String createUpdateStatement(StatementType updateType, String key) {
+    String[] fieldKeys = updateType.getFieldString().split(",");
+    StringBuilder update = new StringBuilder("UPDATE ");
+    update.append(updateType.getTableName());
+    update.append(" SET ");
+    for (int i = 0; i < fieldKeys.length; i++) {
+      update.append(fieldKeys[i]);
+      update.append("=?");
+      if (i < fieldKeys.length - 1) {
+        update.append(", ");
+      }
+    }
+    update.append(" WHERE ");
+    update.append(JdbcDBClient.PRIMARY_KEY);
+    update.append(" = ?");
+    return update.toString();
+  }
+
+  @Override
+  public String createScanStatement(StatementType scanType, String key) {
+    StringBuilder select = new StringBuilder("SELECT * FROM ");
+    select.append(scanType.getTableName());
+    select.append(" WHERE ");
+    select.append(JdbcDBClient.PRIMARY_KEY);
+    select.append(" >= ?");
+    select.append(" ORDER BY ");
+    select.append(JdbcDBClient.PRIMARY_KEY);
+    select.append(" LIMIT ?");
+    return select.toString();
+  }
+}
diff --git a/jdbc/src/main/java/com/yahoo/ycsb/db/flavors/PhoenixDBFlavor.java b/jdbc/src/main/java/com/yahoo/ycsb/db/flavors/PhoenixDBFlavor.java
new file mode 100644
index 0000000000000000000000000000000000000000..9a37923297bd99c34c652dbee68fb883ff0c52d0
--- /dev/null
+++ b/jdbc/src/main/java/com/yahoo/ycsb/db/flavors/PhoenixDBFlavor.java
@@ -0,0 +1,65 @@
+/**
+ * Copyright (c) 2016 YCSB contributors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+package com.yahoo.ycsb.db.flavors;
+
+import com.yahoo.ycsb.db.JdbcDBClient;
+import com.yahoo.ycsb.db.StatementType;
+
+/**
+ * Database flavor for Apache Phoenix. Captures syntax differences used by Phoenix.
+ */
+public class PhoenixDBFlavor extends DefaultDBFlavor {
+  public PhoenixDBFlavor() {
+    super(DBName.PHOENIX);
+  }
+
+  @Override
+  public String createInsertStatement(StatementType insertType, String key) {
+    // Phoenix uses UPSERT syntax
+    StringBuilder insert = new StringBuilder("UPSERT INTO ");
+    insert.append(insertType.getTableName());
+    insert.append(" (" + JdbcDBClient.PRIMARY_KEY + "," + insertType.getFieldString() + ")");
+    insert.append(" VALUES(?");
+    for (int i = 0; i < insertType.getNumFields(); i++) {
+      insert.append(",?");
+    }
+    insert.append(")");
+    return insert.toString();
+  }
+
+  @Override
+  public String createUpdateStatement(StatementType updateType, String key) {
+    // Phoenix doesn't have UPDATE semantics, just re-use UPSERT VALUES on the specific columns
+    String[] fieldKeys = updateType.getFieldString().split(",");
+    StringBuilder update = new StringBuilder("UPSERT INTO ");
+    update.append(updateType.getTableName());
+    update.append(" (");
+    // Each column to update
+    for (int i = 0; i < fieldKeys.length; i++) {
+      update.append(fieldKeys[i]).append(",");
+    }
+    // And then set the primary key column
+    update.append(JdbcDBClient.PRIMARY_KEY).append(") VALUES(");
+    // Add an unbound param for each column to update
+    for (int i = 0; i < fieldKeys.length; i++) {
+      update.append("?, ");
+    }
+    // Then the primary key column's value
+    update.append("?)");
+    return update.toString();
+  }
+}
diff --git a/jdbc/src/main/java/com/yahoo/ycsb/db/flavors/package-info.java b/jdbc/src/main/java/com/yahoo/ycsb/db/flavors/package-info.java
new file mode 100644
index 0000000000000000000000000000000000000000..bac74c5efe7ba40f698d26b33255366612cc9a05
--- /dev/null
+++ b/jdbc/src/main/java/com/yahoo/ycsb/db/flavors/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Copyright (c) 2016 YCSB contributors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+/**
+ * This package contains a collection of database-specific overrides. This accounts for the variance
+ * that can be present where JDBC does not explicitly define what a database must do or when a
+ * database has a non-standard SQL implementation.
+ */
+package com.yahoo.ycsb.db.flavors;
diff --git a/jdbc/src/test/java/com/yahoo/ycsb/db/JdbcDBClientTest.java b/jdbc/src/test/java/com/yahoo/ycsb/db/JdbcDBClientTest.java
index 6317f483c3f63aecf9cbf0ab216c23d473db2469..d298f9e45f8bc9f8ec91b82dc90a8ae4205aadd7 100644
--- a/jdbc/src/test/java/com/yahoo/ycsb/db/JdbcDBClientTest.java
+++ b/jdbc/src/test/java/com/yahoo/ycsb/db/JdbcDBClientTest.java
@@ -1,5 +1,5 @@
 /**
- * Copyright (c) 2015 - 2016 Yahoo! Inc. All rights reserved.
+ * Copyright (c) 2015 - 2016 Yahoo! Inc., 2016 YCSB contributors. All rights reserved.
  *
  * Licensed under the Apache License, Version 2.0 (the "License"); you
  * may not use this file except in compliance with the License. You
@@ -46,10 +46,10 @@ public class JdbcDBClientTest {
 
     @BeforeClass
     public static void setup() {
-      setupWithBatch(1);
+      setupWithBatch(1, true);
     }
 
-    public static void setupWithBatch(int batchSize) {
+    public static void setupWithBatch(int batchSize, boolean autoCommit) {
       try {
         jdbcConnection = DriverManager.getConnection(TEST_DB_URL);
         jdbcDBClient = new JdbcDBClient();
@@ -59,6 +59,8 @@ public class JdbcDBClientTest {
         p.setProperty(JdbcDBClient.DRIVER_CLASS, TEST_DB_DRIVER);
         p.setProperty(JdbcDBClient.CONNECTION_USER, TEST_DB_USER);
         p.setProperty(JdbcDBClient.DB_BATCH_SIZE, Integer.toString(batchSize));
+        p.setProperty(JdbcDBClient.JDBC_BATCH_UPDATES, "true");
+        p.setProperty(JdbcDBClient.JDBC_AUTO_COMMIT, Boolean.toString(autoCommit));
 
         jdbcDBClient.setProperties(p);
         jdbcDBClient.init();
@@ -337,19 +339,18 @@ public class JdbcDBClientTest {
 
     public void insertBatchTest(int numRows) throws DBException {
       teardown();
-      setupWithBatch(10);
-
+      setupWithBatch(10, false);
       try {
         String insertKey = "user0";
         HashMap<String, ByteIterator> insertMap = insertRow(insertKey);
+        assertEquals(3, insertMap.size());
 
         ResultSet resultSet = jdbcConnection.prepareStatement(
           String.format("SELECT * FROM %s", TABLE_NAME)
             ).executeQuery();
 
-        // Check we do not have a result Row (because batch is not full yet
+        // Check we do not have a result Row (because batch is not full yet)
         assertFalse(resultSet.next());
-
         // insert more rows, completing 1 batch (still results are partial).
         for (int i = 1; i < numRows; i++) {
           insertMap = insertRow("user" + i);
@@ -360,6 +361,8 @@ public class JdbcDBClientTest {
 
         // call cleanup, which should insert the partial batch
         jdbcDBClient.cleanup();
+        // Prevent a teardown() from printing an error
+        jdbcDBClient = null;
 
         // Check that we have all rows
         assertNumRows(numRows);