Skip to content
Snippets Groups Projects
Commit 1a3c8cb9 authored by Chris Larsen's avatar Chris Larsen Committed by GitHub
Browse files

[hbase10] Address #701 by mimicking the same locks from the HBase 0.9… (#1028)

* [hbase10] Fix #701 by mimicking the same locks from the HBase 0.98 client in the HBase 10 client.

* Address CR comments, thanks @allanbank.

* Addressed the table lock.

* Restore the lock on the connection shutdown.
parent 942a3174
No related branches found
No related tags found
No related merge requests found
...@@ -66,10 +66,10 @@ import static com.yahoo.ycsb.workloads.CoreWorkload.TABLENAME_PROPERTY_DEFAULT; ...@@ -66,10 +66,10 @@ import static com.yahoo.ycsb.workloads.CoreWorkload.TABLENAME_PROPERTY_DEFAULT;
* durability. * durability.
*/ */
public class HBaseClient10 extends com.yahoo.ycsb.DB { public class HBaseClient10 extends com.yahoo.ycsb.DB {
private static final AtomicInteger THREAD_COUNT = new AtomicInteger(0);
private Configuration config = HBaseConfiguration.create(); private Configuration config = HBaseConfiguration.create();
private static AtomicInteger threadCount = new AtomicInteger(0);
private boolean debug = false; private boolean debug = false;
private String tableName = ""; private String tableName = "";
...@@ -82,7 +82,6 @@ public class HBaseClient10 extends com.yahoo.ycsb.DB { ...@@ -82,7 +82,6 @@ public class HBaseClient10 extends com.yahoo.ycsb.DB {
* @See #CONNECTION_LOCK. * @See #CONNECTION_LOCK.
*/ */
private static Connection connection = null; private static Connection connection = null;
private static final Object CONNECTION_LOCK = new Object();
// Depending on the value of clientSideBuffering, either bufferedMutator // Depending on the value of clientSideBuffering, either bufferedMutator
// (clientSideBuffering) or currentTable (!clientSideBuffering) will be used. // (clientSideBuffering) or currentTable (!clientSideBuffering) will be used.
...@@ -144,12 +143,19 @@ public class HBaseClient10 extends com.yahoo.ycsb.DB { ...@@ -144,12 +143,19 @@ public class HBaseClient10 extends com.yahoo.ycsb.DB {
} }
} }
String table = getProperties().getProperty(TABLENAME_PROPERTY, TABLENAME_PROPERTY_DEFAULT);
try { try {
threadCount.getAndIncrement(); THREAD_COUNT.getAndIncrement();
synchronized (CONNECTION_LOCK) { synchronized (THREAD_COUNT) {
if (connection == null) { if (connection == null) {
// Initialize if not set up already. // Initialize if not set up already.
connection = ConnectionFactory.createConnection(config); connection = ConnectionFactory.createConnection(config);
// Terminate right now if table does not exist, since the client
// will not propagate this error upstream once the workload
// starts.
final TableName tName = TableName.valueOf(table);
connection.getTable(tName).getTableDescriptor();
} }
} }
} catch (java.io.IOException e) { } catch (java.io.IOException e) {
...@@ -172,19 +178,6 @@ public class HBaseClient10 extends com.yahoo.ycsb.DB { ...@@ -172,19 +178,6 @@ public class HBaseClient10 extends com.yahoo.ycsb.DB {
throw new DBException("No columnfamily specified"); throw new DBException("No columnfamily specified");
} }
columnFamilyBytes = Bytes.toBytes(columnFamily); columnFamilyBytes = Bytes.toBytes(columnFamily);
// Terminate right now if table does not exist, since the client
// will not propagate this error upstream once the workload
// starts.
String table = getProperties().getProperty(TABLENAME_PROPERTY, TABLENAME_PROPERTY_DEFAULT);
try {
final TableName tName = TableName.valueOf(table);
synchronized (CONNECTION_LOCK) {
connection.getTable(tName).getTableDescriptor();
}
} catch (IOException e) {
throw new DBException(e);
}
} }
/** /**
...@@ -208,14 +201,14 @@ public class HBaseClient10 extends com.yahoo.ycsb.DB { ...@@ -208,14 +201,14 @@ public class HBaseClient10 extends com.yahoo.ycsb.DB {
long en = System.nanoTime(); long en = System.nanoTime();
final String type = clientSideBuffering ? "UPDATE" : "CLEANUP"; final String type = clientSideBuffering ? "UPDATE" : "CLEANUP";
measurements.measure(type, (int) ((en - st) / 1000)); measurements.measure(type, (int) ((en - st) / 1000));
threadCount.decrementAndGet(); int threadCount = THREAD_COUNT.decrementAndGet();
if (threadCount.get() <= 0) { if (threadCount <= 0) {
// Means we are done so ok to shut down the Connection. // Means we are done so ok to shut down the Connection.
synchronized (CONNECTION_LOCK) { synchronized (THREAD_COUNT) {
if (connection != null) { if (connection != null) {
connection.close(); connection.close();
connection = null; connection = null;
} }
} }
} }
} catch (IOException e) { } catch (IOException e) {
...@@ -225,13 +218,11 @@ public class HBaseClient10 extends com.yahoo.ycsb.DB { ...@@ -225,13 +218,11 @@ public class HBaseClient10 extends com.yahoo.ycsb.DB {
public void getHTable(String table) throws IOException { public void getHTable(String table) throws IOException {
final TableName tName = TableName.valueOf(table); final TableName tName = TableName.valueOf(table);
synchronized (CONNECTION_LOCK) { this.currentTable = connection.getTable(tName);
this.currentTable = connection.getTable(tName); if (clientSideBuffering) {
if (clientSideBuffering) { final BufferedMutatorParams p = new BufferedMutatorParams(tName);
final BufferedMutatorParams p = new BufferedMutatorParams(tName); p.writeBufferSize(writeBufferSize);
p.writeBufferSize(writeBufferSize); this.bufferedMutator = connection.getBufferedMutator(p);
this.bufferedMutator = connection.getBufferedMutator(p);
}
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment