Skip to content
Snippets Groups Projects
Commit 74b50800 authored by Enis Soztutar's avatar Enis Soztutar Committed by Josh Elser
Browse files

Add db.flavor to capture syntax differences among JDBC implementations.

Add Phoenix flavor for Apache Phoenix.
parent 1e2667bb
No related branches found
No related tags found
No related merge requests found
/**
* Copyright (c) 2010 - 2016 Yahoo! Inc. All rights reserved.
* Copyright (c) 2010 - 2016 Yahoo! Inc., 2016 YCSB contributors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
......@@ -87,6 +87,9 @@ public class JdbcDBClient extends DB {
private static final String DEFAULT_PROP = "";
private ConcurrentMap<StatementType, PreparedStatement> cachedStatements;
private long numRowsInBatch = 0;
/** DB flavor defines DB-specific syntax and behavior for the
* particular database. Current database flavors are: {default, phoenix} */
private DBFlavor dbFlavor;
/**
* Ordered field information for insert and update statements.
......@@ -109,6 +112,55 @@ public class JdbcDBClient extends DB {
}
}
/**
* DBFlavor captures minor differences in syntax and behavior among JDBC implementations and SQL
* dialects.
*/
private abstract static class DBFlavor {
enum DBName {
DEFAULT,
PHOENIX
}
private final DBName dbName;
public DBFlavor(DBName dbName) {
this.dbName = dbName;
}
public static DBFlavor fromJdbcUrl(String url) {
if (url.startsWith("jdbc:phoenix")) {
return new PhoenixDBFlavor();
}
return new DefaultDBFlavor();
}
/**
* Create and return a SQL statement for inserting data.
*/
abstract String createInsertStatement(StatementType insertType, String key);
/**
* Create and return a SQL statement for reading data.
*/
abstract String createReadStatement(StatementType readType, String key);
/**
* Create and return a SQL statement for deleting data.
*/
abstract String createDeleteStatement(StatementType deleteType, String key);
/**
* Create and return a SQL statement for updating data.
*/
abstract String createUpdateStatement(StatementType updateType, String key);
/**
* Create and return a SQL statement for scanning data.
*/
abstract String createScanStatement(StatementType scanType, String key);
}
/**
* The statement type for the prepared statements.
*/
......@@ -232,6 +284,15 @@ public class JdbcDBClient extends DB {
return -1;
}
/** Returns parsed boolean value from the properties if set, otherwise returns defaultVal. */
private static boolean getBoolProperty(Properties props, String key, boolean defaultVal) {
String valueStr = props.getProperty(key);
if (valueStr != null) {
return Boolean.parseBoolean(valueStr);
}
return defaultVal;
}
@Override
public void init() throws DBException {
if (initialized) {
......@@ -247,8 +308,7 @@ public class JdbcDBClient extends DB {
this.jdbcFetchSize = getIntProperty(props, JDBC_FETCH_SIZE);
this.batchSize = getIntProperty(props, DB_BATCH_SIZE);
String autoCommitStr = props.getProperty(JDBC_AUTO_COMMIT, Boolean.TRUE.toString());
Boolean autoCommit = Boolean.parseBoolean(autoCommitStr);
boolean autoCommit = getBoolProperty(props, JDBC_AUTO_COMMIT, true);
try {
if (driver != null) {
......@@ -256,7 +316,8 @@ public class JdbcDBClient extends DB {
}
int shardCount = 0;
conns = new ArrayList<Connection>(3);
for (String url : urls.split(",")) {
String[] urlArr = urls.split(",");
for (String url : urlArr) {
System.out.println("Adding shard node URL: " + url);
Connection conn = DriverManager.getConnection(url, user, passwd);
......@@ -273,6 +334,8 @@ public class JdbcDBClient extends DB {
System.out.println("Using shards: " + shardCount + ", batchSize:" + batchSize + ", fetchSize: " + jdbcFetchSize);
cachedStatements = new ConcurrentHashMap<StatementType, PreparedStatement>();
this.dbFlavor = DBFlavor.fromJdbcUrl(urlArr[0]);
} catch (ClassNotFoundException e) {
System.err.println("Error in initializing the JDBS driver: " + e);
throw new DBException(e);
......@@ -283,6 +346,7 @@ public class JdbcDBClient extends DB {
System.err.println("Invalid value for fieldcount property. " + e);
throw new DBException(e);
}
initialized = true;
}
......@@ -310,16 +374,10 @@ public class JdbcDBClient extends DB {
}
}
private PreparedStatement createAndCacheInsertStatement(StatementType insertType, String key) throws SQLException {
StringBuilder insert = new StringBuilder("INSERT INTO ");
insert.append(insertType.tableName);
insert.append(" (" + PRIMARY_KEY + "," + insertType.fieldString + ")");
insert.append(" VALUES(?");
for (int i = 0; i < insertType.numFields; i++) {
insert.append(",?");
}
insert.append(")");
PreparedStatement insertStatement = getShardConnectionByKey(key).prepareStatement(insert.toString());
private PreparedStatement createAndCacheInsertStatement(StatementType insertType, String key)
throws SQLException {
String insert = dbFlavor.createInsertStatement(insertType, key);
PreparedStatement insertStatement = getShardConnectionByKey(key).prepareStatement(insert);
PreparedStatement stmt = cachedStatements.putIfAbsent(insertType, insertStatement);
if (stmt == null) {
return insertStatement;
......@@ -327,14 +385,10 @@ public class JdbcDBClient extends DB {
return stmt;
}
private PreparedStatement createAndCacheReadStatement(StatementType readType, String key) throws SQLException {
StringBuilder read = new StringBuilder("SELECT * FROM ");
read.append(readType.tableName);
read.append(" WHERE ");
read.append(PRIMARY_KEY);
read.append(" = ");
read.append("?");
PreparedStatement readStatement = getShardConnectionByKey(key).prepareStatement(read.toString());
private PreparedStatement createAndCacheReadStatement(StatementType readType, String key)
throws SQLException {
String read = dbFlavor.createReadStatement(readType, key);
PreparedStatement readStatement = getShardConnectionByKey(key).prepareStatement(read);
PreparedStatement stmt = cachedStatements.putIfAbsent(readType, readStatement);
if (stmt == null) {
return readStatement;
......@@ -342,13 +396,10 @@ public class JdbcDBClient extends DB {
return stmt;
}
private PreparedStatement createAndCacheDeleteStatement(StatementType deleteType, String key) throws SQLException {
StringBuilder delete = new StringBuilder("DELETE FROM ");
delete.append(deleteType.tableName);
delete.append(" WHERE ");
delete.append(PRIMARY_KEY);
delete.append(" = ?");
PreparedStatement deleteStatement = getShardConnectionByKey(key).prepareStatement(delete.toString());
private PreparedStatement createAndCacheDeleteStatement(StatementType deleteType, String key)
throws SQLException {
String delete = dbFlavor.createDeleteStatement(deleteType, key);
PreparedStatement deleteStatement = getShardConnectionByKey(key).prepareStatement(delete);
PreparedStatement stmt = cachedStatements.putIfAbsent(deleteType, deleteStatement);
if (stmt == null) {
return deleteStatement;
......@@ -356,22 +407,10 @@ public class JdbcDBClient extends DB {
return stmt;
}
private PreparedStatement createAndCacheUpdateStatement(StatementType updateType, String key) throws SQLException {
String[] fieldKeys = updateType.fieldString.split(",");
StringBuilder update = new StringBuilder("UPDATE ");
update.append(updateType.tableName);
update.append(" SET ");
for (int i = 0; i < fieldKeys.length; i++) {
update.append(fieldKeys[i]);
update.append("=?");
if (i < fieldKeys.length - 1) {
update.append(", ");
}
}
update.append(" WHERE ");
update.append(PRIMARY_KEY);
update.append(" = ?");
PreparedStatement insertStatement = getShardConnectionByKey(key).prepareStatement(update.toString());
private PreparedStatement createAndCacheUpdateStatement(StatementType updateType, String key)
throws SQLException {
String update = dbFlavor.createUpdateStatement(updateType, key);
PreparedStatement insertStatement = getShardConnectionByKey(key).prepareStatement(update);
PreparedStatement stmt = cachedStatements.putIfAbsent(updateType, insertStatement);
if (stmt == null) {
return insertStatement;
......@@ -379,16 +418,10 @@ public class JdbcDBClient extends DB {
return stmt;
}
private PreparedStatement createAndCacheScanStatement(StatementType scanType, String key) throws SQLException {
StringBuilder select = new StringBuilder("SELECT * FROM ");
select.append(scanType.tableName);
select.append(" WHERE ");
select.append(PRIMARY_KEY);
select.append(" >= ?");
select.append(" ORDER BY ");
select.append(PRIMARY_KEY);
select.append(" LIMIT ?");
PreparedStatement scanStatement = getShardConnectionByKey(key).prepareStatement(select.toString());
private PreparedStatement createAndCacheScanStatement(StatementType scanType, String key)
throws SQLException {
String select = dbFlavor.createScanStatement(scanType, key);
PreparedStatement scanStatement = getShardConnectionByKey(key).prepareStatement(select);
if (this.jdbcFetchSize > 0) {
scanStatement.setFetchSize(this.jdbcFetchSize);
}
......@@ -399,6 +432,104 @@ public class JdbcDBClient extends DB {
return stmt;
}
private static class DefaultDBFlavor extends DBFlavor {
public DefaultDBFlavor() {
super(DBName.DEFAULT);
}
public DefaultDBFlavor(DBName dbName) {
super(dbName);
}
@Override
String createInsertStatement(StatementType insertType, String key) {
StringBuilder insert = new StringBuilder("INSERT INTO ");
insert.append(insertType.tableName);
insert.append(" (" + PRIMARY_KEY + "," + insertType.fieldString + ")");
insert.append(" VALUES(?");
for (int i = 0; i < insertType.numFields; i++) {
insert.append(",?");
}
insert.append(")");
return insert.toString();
}
@Override
String createReadStatement(StatementType readType, String key) {
StringBuilder read = new StringBuilder("SELECT * FROM ");
read.append(readType.tableName);
read.append(" WHERE ");
read.append(PRIMARY_KEY);
read.append(" = ");
read.append("?");
return read.toString();
}
@Override
String createDeleteStatement(StatementType deleteType, String key) {
StringBuilder delete = new StringBuilder("DELETE FROM ");
delete.append(deleteType.tableName);
delete.append(" WHERE ");
delete.append(PRIMARY_KEY);
delete.append(" = ?");
return delete.toString();
}
@Override
String createUpdateStatement(StatementType updateType, String key) {
String[] fieldKeys = updateType.fieldString.split(",");
StringBuilder update = new StringBuilder("UPDATE ");
update.append(updateType.tableName);
update.append(" SET ");
for (int i = 0; i < fieldKeys.length; i++) {
update.append(fieldKeys[i]);
update.append("=?");
if (i < fieldKeys.length - 1) {
update.append(", ");
}
}
update.append(" WHERE ");
update.append(PRIMARY_KEY);
update.append(" = ?");
return update.toString();
}
@Override
String createScanStatement(StatementType scanType, String key) {
StringBuilder select = new StringBuilder("SELECT * FROM ");
select.append(scanType.tableName);
select.append(" WHERE ");
select.append(PRIMARY_KEY);
select.append(" >= ?");
select.append(" ORDER BY ");
select.append(PRIMARY_KEY);
select.append(" LIMIT ?");
return select.toString();
}
}
/**
* Database flavor for Apache Phoenix. Captures syntax differences used by Phoenix.
*/
private static class PhoenixDBFlavor extends DefaultDBFlavor {
public PhoenixDBFlavor() {
super(DBName.PHOENIX);
}
@Override
String createInsertStatement(StatementType insertType, String key) {
// Phoenix uses UPSERT syntax
StringBuilder insert = new StringBuilder("UPSERT INTO ");
insert.append(insertType.tableName);
insert.append(" (" + PRIMARY_KEY + "," + insertType.fieldString + ")");
insert.append(" VALUES(?");
for (int i = 0; i < insertType.numFields; i++) {
insert.append(",?");
}
insert.append(")");
return insert.toString();
}
}
@Override
public Status read(String tableName, String key, Set<String> fields, HashMap<String, ByteIterator> result) {
try {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment