Skip to content
Snippets Groups Projects
Commit 76a8141f authored by Andy Kruth's avatar Andy Kruth
Browse files

Merge pull request #536 from kruthar/cassandra2-properties

[cassandra2] added better property handling
parents 2c3e45fd e2ce4e9f
No related branches found
No related tags found
No related merge requests found
......@@ -71,3 +71,10 @@ For keyspace `ycsb`, table `usertable`:
* Default value is `ONE`
- Consistency level for reads and writes, respectively. See the [DataStax documentation](http://docs.datastax.com/en/cassandra/2.0/cassandra/dml/dml_config_consistency_c.html) for details.
* *Note that the default setting does not provide durability in the face of node failure. Changing this setting will affect observed performance.* See also `replication_factor`, above.
* `cassandra.maxconnections`
* `cassandra.coreconnections`
* Defaults for max and core connections can be found here: https://datastax.github.io/java-driver/2.1.8/features/pooling/#pool-size. Cassandra 2.0.X falls under protocol V2, Cassandra 2.1+ falls under protocol V3.
* `cassandra.connecttimeoutmillis`
* `cassandra.readtimeoutmillis`
* Defaults for connect and read timeouts can be found here: https://docs.datastax.com/en/drivers/java/2.0/com/datastax/driver/core/SocketOptions.html.
\ No newline at end of file
......@@ -67,6 +67,7 @@ public class CassandraCQLClient extends DB {
public static final String HOSTS_PROPERTY = "hosts";
public static final String PORT_PROPERTY = "port";
public static final String PORT_PROPERTY_DEFAULT = "9042";
public static final String READ_CONSISTENCY_LEVEL_PROPERTY =
"cassandra.readconsistencylevel";
......@@ -75,6 +76,15 @@ public class CassandraCQLClient extends DB {
"cassandra.writeconsistencylevel";
public static final String WRITE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT = "ONE";
public static final String MAX_CONNECTIONS_PROPERTY =
"cassandra.maxconnections";
public static final String CORE_CONNECTIONS_PROPERTY =
"cassandra.coreconnections";
public static final String CONNECT_TIMEOUT_MILLIS_PROPERTY =
"cassandra.connecttimeoutmillis";
public static final String READ_TIMEOUT_MILLIS_PROPERTY =
"cassandra.readtimeoutmillis";
/**
* Count the number of times initialized to teardown on the last
* {@link #cleanup()}.
......@@ -114,12 +124,7 @@ public class CassandraCQLClient extends DB {
HOSTS_PROPERTY));
}
String[] hosts = host.split(",");
String port = getProperties().getProperty("port", "9042");
if (port == null) {
throw new DBException(String.format(
"Required property \"%s\" missing for CassandraCQLClient",
PORT_PROPERTY));
}
String port = getProperties().getProperty(PORT_PROPERTY, PORT_PROPERTY_DEFAULT);
String username = getProperties().getProperty(USERNAME_PROPERTY);
String password = getProperties().getProperty(PASSWORD_PROPERTY);
......@@ -134,7 +139,6 @@ public class CassandraCQLClient extends DB {
getProperties().getProperty(WRITE_CONSISTENCY_LEVEL_PROPERTY,
WRITE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT));
// public void connect(String node) {}
if ((username != null) && !username.isEmpty()) {
cluster = Cluster.builder().withCredentials(username, password)
.withPort(Integer.valueOf(port)).addContactPoints(hosts).build();
......@@ -143,18 +147,35 @@ public class CassandraCQLClient extends DB {
.addContactPoints(hosts).build();
}
// Update number of connections based on threads
int threadcount =
Integer.parseInt(getProperties().getProperty("threadcount", "1"));
cluster.getConfiguration().getPoolingOptions()
.setMaxConnectionsPerHost(HostDistance.LOCAL, threadcount);
// Set connection timeout 3min (default is 5s)
cluster.getConfiguration().getSocketOptions()
.setConnectTimeoutMillis(3 * 60 * 1000);
// Set read (execute) timeout 3min (default is 12s)
cluster.getConfiguration().getSocketOptions()
.setReadTimeoutMillis(3 * 60 * 1000);
String maxConnections = getProperties().getProperty(
MAX_CONNECTIONS_PROPERTY);
if (maxConnections != null) {
cluster.getConfiguration().getPoolingOptions()
.setMaxConnectionsPerHost(HostDistance.LOCAL,
Integer.valueOf(maxConnections));
}
String coreConnections = getProperties().getProperty(
CORE_CONNECTIONS_PROPERTY);
if (coreConnections != null) {
cluster.getConfiguration().getPoolingOptions()
.setCoreConnectionsPerHost(HostDistance.LOCAL,
Integer.valueOf(coreConnections));
}
String connectTimoutMillis = getProperties().getProperty(
CONNECT_TIMEOUT_MILLIS_PROPERTY);
if (connectTimoutMillis != null) {
cluster.getConfiguration().getSocketOptions()
.setConnectTimeoutMillis(Integer.valueOf(connectTimoutMillis));
}
String readTimoutMillis = getProperties().getProperty(
READ_TIMEOUT_MILLIS_PROPERTY);
if (readTimoutMillis != null) {
cluster.getConfiguration().getSocketOptions()
.setReadTimeoutMillis(Integer.valueOf(readTimoutMillis));
}
Metadata metadata = cluster.getMetadata();
System.err.printf("Connected to cluster: %s\n",
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment