Skip to content
Snippets Groups Projects
Commit 9ebc5a47 authored by Ilya Bakulin's avatar Ilya Bakulin
Browse files

- Added simple sharding to JDBC driver.

parent 5a973258
No related branches found
No related tags found
No related merge requests found
......@@ -17,22 +17,14 @@
package com.yahoo.ycsb.db;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import com.yahoo.ycsb.DB;
import com.yahoo.ycsb.DBException;
import java.sql.*;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* A class that wraps a JDBC compliant database to allow it to be interfaced with YCSB.
* This class extends {@link DB} and implements the database interface used by YCSB client.
......@@ -58,7 +50,7 @@ import com.yahoo.ycsb.DBException;
*/
public class JdbcDBClient extends DB implements JdbcDBClientConstants {
private Connection conn;
private ArrayList<Connection> conns;
private boolean initialized = false;
private Properties props;
private static final String DEFAULT_PROP = "";
......@@ -90,20 +82,22 @@ public class JdbcDBClient extends DB implements JdbcDBClientConstants {
}
Type type;
int shardIndex;
int numFields;
String tableName;
StatementType(Type type, String tableName, int numFields) {
StatementType(Type type, String tableName, int numFields, int _shardIndex) {
this.type = type;
this.tableName = tableName;
this.numFields = numFields;
this.shardIndex = _shardIndex;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + numFields;
result = prime * result + numFields + 100 * shardIndex;
result = prime * result
+ ((tableName == null) ? 0 : tableName.hashCode());
result = prime * result + ((type == null) ? 0 : type.getHashCode());
......@@ -121,6 +115,8 @@ public class JdbcDBClient extends DB implements JdbcDBClientConstants {
StatementType other = (StatementType) obj;
if (numFields != other.numFields)
return false;
if (shardIndex != other.shardIndex)
return false;
if (tableName == null) {
if (other.tableName != null)
return false;
......@@ -131,6 +127,35 @@ public class JdbcDBClient extends DB implements JdbcDBClientConstants {
return true;
}
}
/**
* For the given key, returns what shard contains data for this key
*
* @param key Data key to do operation on
* @return Shard index
*/
private int getShardIndexByKey(String key) {
int ret = Math.abs(key.hashCode()) % conns.size();
//System.out.println(conns.size() + ": Shard instance for "+ key + " (hash " + key.hashCode()+ " ) " + " is " + ret);
return ret;
}
/**
* For the given key, returns Connection object that holds connection
* to the shard that contains this key
*
* @param key Data key to get information for
* @return Connection object
*/
private Connection getShardConnectionByKey(String key) {
return conns.get(getShardIndexByKey(key));
}
private void cleanupAllConnections() throws SQLException {
for(Connection conn: conns) {
conn.close();
}
}
/**
* Initialize the database connection and set it up for sending requests to the database.
......@@ -144,18 +169,29 @@ public class JdbcDBClient extends DB implements JdbcDBClientConstants {
return;
}
props = getProperties();
String url = props.getProperty(CONNECTION_URL, DEFAULT_PROP);
String urls = props.getProperty(CONNECTION_URL, DEFAULT_PROP);
String user = props.getProperty(CONNECTION_USER, DEFAULT_PROP);
String passwd = props.getProperty(CONNECTION_PASSWD, DEFAULT_PROP);
String driver = props.getProperty(DRIVER_CLASS);
try {
try {
if (driver != null) {
Class.forName(driver);
}
conn = DriverManager.getConnection(url, user, passwd);
// Since there is no explicit commit method in the DB interface, all
// operations should auto commit.
conn.setAutoCommit(true);
int shardCount = 0;
conns = new ArrayList<Connection>(3);
for (String url: urls.split(",")) {
System.out.println("Adding shard node URL: " + url);
Connection conn = DriverManager.getConnection(url, user, passwd);
// Since there is no explicit commit method in the DB interface, all
// operations should auto commit.
conn.setAutoCommit(true);
shardCount++;
conns.add(conn);
}
System.out.println("Using " + shardCount + " shards");
cachedStatements = new ConcurrentHashMap<StatementType, PreparedStatement>();
} catch (ClassNotFoundException e) {
System.err.println("Error in initializing the JDBS driver: " + e);
......@@ -173,14 +209,14 @@ public class JdbcDBClient extends DB implements JdbcDBClientConstants {
@Override
public void cleanup() throws DBException {
try {
conn.close();
cleanupAllConnections();
} catch (SQLException e) {
System.err.println("Error in closing the connection. " + e);
throw new DBException(e);
}
}
private PreparedStatement createAndCacheInsertStatement(StatementType insertType)
private PreparedStatement createAndCacheInsertStatement(StatementType insertType, String key)
throws SQLException {
StringBuilder insert = new StringBuilder("INSERT INTO ");
insert.append(insertType.tableName);
......@@ -189,13 +225,13 @@ public class JdbcDBClient extends DB implements JdbcDBClientConstants {
insert.append(",?");
}
insert.append(");");
PreparedStatement insertStatement = conn.prepareStatement(insert.toString());
PreparedStatement insertStatement = getShardConnectionByKey(key).prepareStatement(insert.toString());
PreparedStatement stmt = cachedStatements.putIfAbsent(insertType, insertStatement);
if (stmt == null) return insertStatement;
else return stmt;
}
private PreparedStatement createAndCacheReadStatement(StatementType readType)
private PreparedStatement createAndCacheReadStatement(StatementType readType, String key)
throws SQLException {
StringBuilder read = new StringBuilder("SELECT * FROM ");
read.append(readType.tableName);
......@@ -203,26 +239,26 @@ public class JdbcDBClient extends DB implements JdbcDBClientConstants {
read.append(PRIMARY_KEY);
read.append(" = ");
read.append("?;");
PreparedStatement readStatement = conn.prepareStatement(read.toString());
PreparedStatement readStatement = getShardConnectionByKey(key).prepareStatement(read.toString());
PreparedStatement stmt = cachedStatements.putIfAbsent(readType, readStatement);
if (stmt == null) return readStatement;
else return stmt;
}
private PreparedStatement createAndCacheDeleteStatement(StatementType deleteType)
private PreparedStatement createAndCacheDeleteStatement(StatementType deleteType, String key)
throws SQLException {
StringBuilder delete = new StringBuilder("DELETE FROM ");
delete.append(deleteType.tableName);
delete.append(" WHERE ");
delete.append(PRIMARY_KEY);
delete.append(" = ?;");
PreparedStatement deleteStatement = conn.prepareStatement(delete.toString());
PreparedStatement deleteStatement = getShardConnectionByKey(key).prepareStatement(delete.toString());
PreparedStatement stmt = cachedStatements.putIfAbsent(deleteType, deleteStatement);
if (stmt == null) return deleteStatement;
else return stmt;
}
private PreparedStatement createAndCacheUpdateStatement(StatementType updateType)
private PreparedStatement createAndCacheUpdateStatement(StatementType updateType, String key)
throws SQLException {
StringBuilder update = new StringBuilder("UPDATE ");
update.append(updateType.tableName);
......@@ -236,13 +272,13 @@ public class JdbcDBClient extends DB implements JdbcDBClientConstants {
update.append(" WHERE ");
update.append(PRIMARY_KEY);
update.append(" = ?;");
PreparedStatement insertStatement = conn.prepareStatement(update.toString());
PreparedStatement insertStatement = getShardConnectionByKey(key).prepareStatement(update.toString());
PreparedStatement stmt = cachedStatements.putIfAbsent(updateType, insertStatement);
if (stmt == null) return insertStatement;
else return stmt;
}
private PreparedStatement createAndCacheScanStatement(StatementType scanType)
private PreparedStatement createAndCacheScanStatement(StatementType scanType, String key)
throws SQLException {
StringBuilder select = new StringBuilder("SELECT * FROM ");
select.append(scanType.tableName);
......@@ -250,7 +286,7 @@ public class JdbcDBClient extends DB implements JdbcDBClientConstants {
select.append(PRIMARY_KEY);
select.append(" >= ");
select.append("?;");
PreparedStatement scanStatement = conn.prepareStatement(select.toString());
PreparedStatement scanStatement = getShardConnectionByKey(key).prepareStatement(select.toString());
PreparedStatement stmt = cachedStatements.putIfAbsent(scanType, scanStatement);
if (stmt == null) return scanStatement;
else return stmt;
......@@ -266,10 +302,10 @@ public class JdbcDBClient extends DB implements JdbcDBClientConstants {
return -1;
}
try {
StatementType type = new StatementType(StatementType.Type.READ, tableName, 1);
StatementType type = new StatementType(StatementType.Type.READ, tableName, 1, getShardIndexByKey(key));
PreparedStatement readStatement = cachedStatements.get(type);
if (readStatement == null) {
readStatement = createAndCacheReadStatement(type);
readStatement = createAndCacheReadStatement(type, key);
}
readStatement.setString(1, key);
ResultSet resultSet = readStatement.executeQuery();
......@@ -286,6 +322,7 @@ public class JdbcDBClient extends DB implements JdbcDBClientConstants {
resultSet.close();
return SUCCESS;
} catch (SQLException e) {
System.err.println("Error in processing read of table " + tableName + ": "+e);
return -2;
}
}
......@@ -300,10 +337,10 @@ public class JdbcDBClient extends DB implements JdbcDBClientConstants {
return -1;
}
try {
StatementType type = new StatementType(StatementType.Type.SCAN, tableName, 1);
StatementType type = new StatementType(StatementType.Type.SCAN, tableName, 1, getShardIndexByKey(startKey));
PreparedStatement scanStatement = cachedStatements.get(type);
if (scanStatement == null) {
scanStatement = createAndCacheScanStatement(type);
scanStatement = createAndCacheScanStatement(type, startKey);
}
scanStatement.setString(1, startKey);
ResultSet resultSet = scanStatement.executeQuery();
......@@ -335,10 +372,10 @@ public class JdbcDBClient extends DB implements JdbcDBClientConstants {
}
try {
int numFields = values.size();
StatementType type = new StatementType(StatementType.Type.UPDATE, tableName, numFields);
StatementType type = new StatementType(StatementType.Type.UPDATE, tableName, numFields, getShardIndexByKey(key));
PreparedStatement updateStatement = cachedStatements.get(type);
if (updateStatement == null) {
updateStatement = createAndCacheUpdateStatement(type);
updateStatement = createAndCacheUpdateStatement(type, key);
}
int index = 1;
for (Map.Entry<String, String> entry : values.entrySet()) {
......@@ -364,10 +401,10 @@ public class JdbcDBClient extends DB implements JdbcDBClientConstants {
}
try {
int numFields = values.size();
StatementType type = new StatementType(StatementType.Type.INSERT, tableName, numFields);
StatementType type = new StatementType(StatementType.Type.INSERT, tableName, numFields, getShardIndexByKey(key));
PreparedStatement insertStatement = cachedStatements.get(type);
if (insertStatement == null) {
insertStatement = createAndCacheInsertStatement(type);
insertStatement = createAndCacheInsertStatement(type, key);
}
insertStatement.setString(1, key);
int index = 2;
......@@ -393,10 +430,10 @@ public class JdbcDBClient extends DB implements JdbcDBClientConstants {
return -1;
}
try {
StatementType type = new StatementType(StatementType.Type.DELETE, tableName, 1);
StatementType type = new StatementType(StatementType.Type.DELETE, tableName, 1, getShardIndexByKey(key));
PreparedStatement deleteStatement = cachedStatements.get(type);
if (deleteStatement == null) {
deleteStatement = createAndCacheDeleteStatement(type);
deleteStatement = createAndCacheDeleteStatement(type, key);
}
deleteStatement.setString(1, key);
int result = deleteStatement.executeUpdate();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment