diff --git a/hbase10/src/main/java/com/yahoo/ycsb/db/HBaseClient10.java b/hbase10/src/main/java/com/yahoo/ycsb/db/HBaseClient10.java index a41c1987325afef791e0cc5b5bfa37f9aa737bef..da72f4f86c6d0101defde14c7548c49ea887f213 100644 --- a/hbase10/src/main/java/com/yahoo/ycsb/db/HBaseClient10.java +++ b/hbase10/src/main/java/com/yahoo/ycsb/db/HBaseClient10.java @@ -51,6 +51,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.Vector; +import java.util.concurrent.atomic.AtomicInteger; /** * HBase 1.0 client for YCSB framework. @@ -63,14 +64,22 @@ import java.util.Vector; */ public class HBaseClient10 extends com.yahoo.ycsb.DB { private Configuration config = HBaseConfiguration.create(); - - // Must be an object for synchronization and tracking running thread counts. - private static Integer threadCount = 0; + + private static AtomicInteger threadCount = new AtomicInteger(0); private boolean debug = false; private String tableName = ""; + + /** + * A Cluster Connection instance that is shared by all running ycsb threads. + * Needs to be initialized late so we pick up command-line configs if any. + * To ensure one instance only in a multi-threaded context, guard access + * with a 'lock' object. + * @See #CONNECTION_LOCK. + */ private static Connection connection = null; + private static final Object CONNECTION_LOCK = new Object(); // Depending on the value of clientSideBuffering, either bufferedMutator // (clientSideBuffering) or currentTable (!clientSideBuffering) will be used. @@ -121,10 +130,10 @@ public class HBaseClient10 extends com.yahoo.ycsb.DB { UserGroupInformation.setConfiguration(config); } - if ((getProperties().getProperty("principal")!=null) + if ((getProperties().getProperty("principal")!=null) && (getProperties().getProperty("keytab")!=null)) { try { - UserGroupInformation.loginUserFromKeytab(getProperties().getProperty("principal"), + UserGroupInformation.loginUserFromKeytab(getProperties().getProperty("principal"), getProperties().getProperty("keytab")); } catch (IOException e) { System.err.println("Keytab file is not readable or not found"); @@ -133,9 +142,10 @@ public class HBaseClient10 extends com.yahoo.ycsb.DB { } try { - synchronized(threadCount) { - ++threadCount; + threadCount.getAndIncrement(); + synchronized (CONNECTION_LOCK) { if (connection == null) { + // Initialize if not set up already. connection = ConnectionFactory.createConnection(config); } } @@ -166,7 +176,9 @@ public class HBaseClient10 extends com.yahoo.ycsb.DB { String table = com.yahoo.ycsb.workloads.CoreWorkload.table; try { final TableName tName = TableName.valueOf(table); - connection.getTable(tName).getTableDescriptor(); + synchronized (CONNECTION_LOCK) { + connection.getTable(tName).getTableDescriptor(); + } } catch (IOException e) { throw new DBException(e); } @@ -193,11 +205,14 @@ public class HBaseClient10 extends com.yahoo.ycsb.DB { long en = System.nanoTime(); final String type = clientSideBuffering ? "UPDATE" : "CLEANUP"; measurements.measure(type, (int) ((en - st) / 1000)); - synchronized(threadCount) { - --threadCount; - if (threadCount <= 0 && connection != null) { - connection.close(); - connection = null; + threadCount.decrementAndGet(); + if (threadCount.get() <= 0) { + // Means we are done so ok to shut down the Connection. + synchronized (CONNECTION_LOCK) { + if (connection != null) { + connection.close(); + connection = null; + } } } } catch (IOException e) { @@ -207,14 +222,13 @@ public class HBaseClient10 extends com.yahoo.ycsb.DB { public void getHTable(String table) throws IOException { final TableName tName = TableName.valueOf(table); - this.currentTable = this.connection.getTable(tName); - // suggestions from - // http://ryantwopointoh.blogspot.com/2009/01/ - // performance-of-hbase-importing.html - if (clientSideBuffering) { - final BufferedMutatorParams p = new BufferedMutatorParams(tName); - p.writeBufferSize(writeBufferSize); - this.bufferedMutator = this.connection.getBufferedMutator(p); + synchronized (CONNECTION_LOCK) { + this.currentTable = connection.getTable(tName); + if (clientSideBuffering) { + final BufferedMutatorParams p = new BufferedMutatorParams(tName); + p.writeBufferSize(writeBufferSize); + this.bufferedMutator = connection.getBufferedMutator(p); + } } }