diff --git a/cassandra2/README.md b/cassandra2/README.md index e3e56b90c7cfbbe2d11c2efcaf5cb277af00bdb8..bee44c8d8c41a36f459d0b38b54d35422dddac1a 100644 --- a/cassandra2/README.md +++ b/cassandra2/README.md @@ -71,3 +71,10 @@ For keyspace `ycsb`, table `usertable`: * Default value is `ONE` - Consistency level for reads and writes, respectively. See the [DataStax documentation](http://docs.datastax.com/en/cassandra/2.0/cassandra/dml/dml_config_consistency_c.html) for details. * *Note that the default setting does not provide durability in the face of node failure. Changing this setting will affect observed performance.* See also `replication_factor`, above. + +* `cassandra.maxconnections` +* `cassandra.coreconnections` + * Defaults for max and core connections can be found here: https://datastax.github.io/java-driver/2.1.8/features/pooling/#pool-size. Cassandra 2.0.X falls under protocol V2, Cassandra 2.1+ falls under protocol V3. +* `cassandra.connecttimeoutmillis` +* `cassandra.readtimeoutmillis` + * Defaults for connect and read timeouts can be found here: https://docs.datastax.com/en/drivers/java/2.0/com/datastax/driver/core/SocketOptions.html. \ No newline at end of file diff --git a/cassandra2/src/main/java/com/yahoo/ycsb/db/CassandraCQLClient.java b/cassandra2/src/main/java/com/yahoo/ycsb/db/CassandraCQLClient.java index 9f093daf89aa9d2d27f6fce34eeaadb87f01467a..602846e98da665e1ada6a70e190e99a1acb7ab6b 100644 --- a/cassandra2/src/main/java/com/yahoo/ycsb/db/CassandraCQLClient.java +++ b/cassandra2/src/main/java/com/yahoo/ycsb/db/CassandraCQLClient.java @@ -67,6 +67,7 @@ public class CassandraCQLClient extends DB { public static final String HOSTS_PROPERTY = "hosts"; public static final String PORT_PROPERTY = "port"; + public static final String PORT_PROPERTY_DEFAULT = "9042"; public static final String READ_CONSISTENCY_LEVEL_PROPERTY = "cassandra.readconsistencylevel"; @@ -75,6 +76,15 @@ public class CassandraCQLClient extends DB { "cassandra.writeconsistencylevel"; public static final String WRITE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT = "ONE"; + public static final String MAX_CONNECTIONS_PROPERTY = + "cassandra.maxconnections"; + public static final String CORE_CONNECTIONS_PROPERTY = + "cassandra.coreconnections"; + public static final String CONNECT_TIMEOUT_MILLIS_PROPERTY = + "cassandra.connecttimeoutmillis"; + public static final String READ_TIMEOUT_MILLIS_PROPERTY = + "cassandra.readtimeoutmillis"; + /** * Count the number of times initialized to teardown on the last * {@link #cleanup()}. @@ -114,12 +124,7 @@ public class CassandraCQLClient extends DB { HOSTS_PROPERTY)); } String[] hosts = host.split(","); - String port = getProperties().getProperty("port", "9042"); - if (port == null) { - throw new DBException(String.format( - "Required property \"%s\" missing for CassandraCQLClient", - PORT_PROPERTY)); - } + String port = getProperties().getProperty(PORT_PROPERTY, PORT_PROPERTY_DEFAULT); String username = getProperties().getProperty(USERNAME_PROPERTY); String password = getProperties().getProperty(PASSWORD_PROPERTY); @@ -134,7 +139,6 @@ public class CassandraCQLClient extends DB { getProperties().getProperty(WRITE_CONSISTENCY_LEVEL_PROPERTY, WRITE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT)); - // public void connect(String node) {} if ((username != null) && !username.isEmpty()) { cluster = Cluster.builder().withCredentials(username, password) .withPort(Integer.valueOf(port)).addContactPoints(hosts).build(); @@ -143,18 +147,35 @@ public class CassandraCQLClient extends DB { .addContactPoints(hosts).build(); } - // Update number of connections based on threads - int threadcount = - Integer.parseInt(getProperties().getProperty("threadcount", "1")); - cluster.getConfiguration().getPoolingOptions() - .setMaxConnectionsPerHost(HostDistance.LOCAL, threadcount); - - // Set connection timeout 3min (default is 5s) - cluster.getConfiguration().getSocketOptions() - .setConnectTimeoutMillis(3 * 60 * 1000); - // Set read (execute) timeout 3min (default is 12s) - cluster.getConfiguration().getSocketOptions() - .setReadTimeoutMillis(3 * 60 * 1000); + String maxConnections = getProperties().getProperty( + MAX_CONNECTIONS_PROPERTY); + if (maxConnections != null) { + cluster.getConfiguration().getPoolingOptions() + .setMaxConnectionsPerHost(HostDistance.LOCAL, + Integer.valueOf(maxConnections)); + } + + String coreConnections = getProperties().getProperty( + CORE_CONNECTIONS_PROPERTY); + if (coreConnections != null) { + cluster.getConfiguration().getPoolingOptions() + .setCoreConnectionsPerHost(HostDistance.LOCAL, + Integer.valueOf(coreConnections)); + } + + String connectTimoutMillis = getProperties().getProperty( + CONNECT_TIMEOUT_MILLIS_PROPERTY); + if (connectTimoutMillis != null) { + cluster.getConfiguration().getSocketOptions() + .setConnectTimeoutMillis(Integer.valueOf(connectTimoutMillis)); + } + + String readTimoutMillis = getProperties().getProperty( + READ_TIMEOUT_MILLIS_PROPERTY); + if (readTimoutMillis != null) { + cluster.getConfiguration().getSocketOptions() + .setReadTimeoutMillis(Integer.valueOf(readTimoutMillis)); + } Metadata metadata = cluster.getMetadata(); System.err.printf("Connected to cluster: %s\n",