diff --git a/hbase098/README.md b/hbase098/README.md
index 8584d05edc7839f1bd86d90deddeed1d45d2b9e2..e6a7fb4158f64c7d3c19e2c7e4c96a94a7cb612c 100644
--- a/hbase098/README.md
+++ b/hbase098/README.md
@@ -74,3 +74,6 @@ Following options can be configurable using `-p`.
 * `hbase.usepagefilter` : If true, HBase
   [PageFilter](https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/filter/PageFilter.html)s
   are used to limit the number of records consumed in a scan operation. The default is true.
+* `principal`: If testing need to be done against a secure HBase cluster using Kerberos Keytab, 
+  this property can be used to pass the principal in the keytab file.
+* `keytab`: The Kerberos keytab file name and location can be passed through this property.
diff --git a/hbase098/src/main/java/com/yahoo/ycsb/db/HBaseClient.java b/hbase098/src/main/java/com/yahoo/ycsb/db/HBaseClient.java
index d0ec137dc0b7cc24c56cf5d0b826f1d898174f1a..9b097b37b4a39df69c4955bd9fbca36d09db124c 100644
--- a/hbase098/src/main/java/com/yahoo/ycsb/db/HBaseClient.java
+++ b/hbase098/src/main/java/com/yahoo/ycsb/db/HBaseClient.java
@@ -23,10 +23,14 @@ import com.yahoo.ycsb.DBException;
 import com.yahoo.ycsb.Status;
 import com.yahoo.ycsb.measurements.Measurements;
 
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTable;
@@ -46,6 +50,7 @@ import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
 import java.util.Vector;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * HBase client for YCSB framework
@@ -55,11 +60,13 @@ public class HBaseClient extends com.yahoo.ycsb.DB
     // BFC: Change to fix broken build (with HBase 0.20.6)
     //private static final Configuration config = HBaseConfiguration.create();
     private static final Configuration config = HBaseConfiguration.create(); //new HBaseConfiguration();
+    private static final AtomicInteger THREAD_COUNT = new AtomicInteger(0);
 
     public boolean _debug=false;
 
     public String _table="";
-    public HTable _hTable=null;
+    private static HConnection _hConn=null;
+    public HTableInterface _hTable=null;
     public String _columnFamily="";
     public byte _columnFamilyBytes[];
     public boolean _clientSideBuffering = false;
@@ -94,7 +101,29 @@ public class HBaseClient extends com.yahoo.ycsb.DB
         if ("false".equals(getProperties().getProperty("hbase.usepagefilter", "true"))) {
           _usePageFilter = false;
         }
-
+        if ("kerberos".equalsIgnoreCase(config.get("hbase.security.authentication"))) {
+          config.set("hadoop.security.authentication", "Kerberos");
+          UserGroupInformation.setConfiguration(config);
+        }
+        if ( (getProperties().getProperty("principal")!=null) && (getProperties().getProperty("keytab")!=null) ){
+            try {
+                UserGroupInformation.loginUserFromKeytab(getProperties().getProperty("principal"), getProperties().getProperty("keytab"));
+            } catch (IOException e) {
+                System.err.println("Keytab file is not readable or not found");
+                throw new DBException(e);
+            }
+        }
+        try {
+            THREAD_COUNT.getAndIncrement();
+            synchronized(THREAD_COUNT) {
+              if (_hConn == null){
+                _hConn = HConnectionManager.createConnection(config);
+              }
+            }
+        } catch (IOException e) {
+            System.err.println("Connection to HBase was not successful");
+            throw new DBException(e);  
+        }
         _columnFamily = getProperties().getProperty("columnfamily");
         if (_columnFamily == null)
         {
@@ -109,7 +138,7 @@ public class HBaseClient extends com.yahoo.ycsb.DB
       String table = com.yahoo.ycsb.workloads.CoreWorkload.table;
       try
 	  {
-	      HTable ht = new HTable(config, table);
+	      HTableInterface ht = _hConn.getTable(table);
 	      ht.getTableDescriptor();
 	  }
       catch (IOException e)
@@ -132,6 +161,12 @@ public class HBaseClient extends com.yahoo.ycsb.DB
             if (_hTable != null) {
                 _hTable.flushCommits();
             }
+            synchronized(THREAD_COUNT) {
+              int threadCount = THREAD_COUNT.decrementAndGet();
+              if (threadCount <= 0 && _hConn != null) {
+                 _hConn.close();
+              }
+            }
             long en=System.nanoTime();
             _measurements.measure("UPDATE", (int)((en-st)/1000));
         } catch (IOException e) {
@@ -142,7 +177,7 @@ public class HBaseClient extends com.yahoo.ycsb.DB
     public void getHTable(String table) throws IOException
     {
         synchronized (tableLock) {
-            _hTable = new HTable(config, table);
+            _hTable = _hConn.getTable(table);
             //2 suggestions from http://ryantwopointoh.blogspot.com/2009/01/performance-of-hbase-importing.html
             _hTable.setAutoFlush(!_clientSideBuffering, true);
             _hTable.setWriteBufferSize(_writeBufferSize);
diff --git a/hbase10/src/main/java/com/yahoo/ycsb/db/HBaseClient10.java b/hbase10/src/main/java/com/yahoo/ycsb/db/HBaseClient10.java
index 49b699a3880e8c6abfb3c5ed18b03b3b9c3dda80..00fb615b6ccfd8439b6bba76011742c249924c74 100644
--- a/hbase10/src/main/java/com/yahoo/ycsb/db/HBaseClient10.java
+++ b/hbase10/src/main/java/com/yahoo/ycsb/db/HBaseClient10.java
@@ -24,6 +24,7 @@ import com.yahoo.ycsb.DBException;
 import com.yahoo.ycsb.Status;
 import com.yahoo.ycsb.measurements.Measurements;
 
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
@@ -50,6 +51,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.Vector;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * HBase 1.0 client for YCSB framework.
@@ -62,11 +64,12 @@ import java.util.Vector;
  */
 public class HBaseClient10 extends com.yahoo.ycsb.DB {
   private Configuration config = HBaseConfiguration.create();
+  private static final AtomicInteger THREAD_COUNT = new AtomicInteger(0);
 
   private boolean debug = false;
 
   private String tableName = "";
-  private Connection connection = null;
+  private static Connection connection = null;
 
   // Depending on the value of clientSideBuffering, either bufferedMutator
   // (clientSideBuffering) or currentTable (!clientSideBuffering) will be used.
@@ -112,8 +115,27 @@ public class HBaseClient10 extends com.yahoo.ycsb.DB {
           Durability.valueOf(getProperties().getProperty("durability"));
     }
 
+    if ("kerberos".equalsIgnoreCase(config.get("hbase.security.authentication"))) {
+      config.set("hadoop.security.authentication", "Kerberos");
+      UserGroupInformation.setConfiguration(config);
+    }
+
+    if ((getProperties().getProperty("principal")!=null) 
+        && (getProperties().getProperty("keytab")!=null)) {
+      try {
+        UserGroupInformation.loginUserFromKeytab(getProperties().getProperty("principal"), 
+              getProperties().getProperty("keytab"));
+      } catch (IOException e) {
+        System.err.println("Keytab file is not readable or not found");
+        throw new DBException(e);
+      }
+    }
+
     try {
-      connection = ConnectionFactory.createConnection(config);
+      THREAD_COUNT.getAndIncrement();
+      synchronized(THREAD_COUNT) {
+        connection = ConnectionFactory.createConnection(config);
+      }
     } catch (java.io.IOException e) {
       throw new DBException(e);
     }
@@ -168,7 +190,12 @@ public class HBaseClient10 extends com.yahoo.ycsb.DB {
       long en = System.nanoTime();
       final String type = clientSideBuffering ? "UPDATE" : "CLEANUP";
       measurements.measure(type, (int) ((en - st) / 1000));
-      connection.close();
+      synchronized(THREAD_COUNT) {
+        int threadCount = THREAD_COUNT.decrementAndGet();
+        if (threadCount <= 0 && connection != null) {
+          connection.close();
+        }
+      }
     } catch (IOException e) {
       throw new DBException(e);
     }