diff --git a/bin/bindings.properties b/bin/bindings.properties index e7a7d592cbff804d07eba31fc402660b02e4c82d..9c862724745948d4bd891ff972a273c73bc8176f 100644 --- a/bin/bindings.properties +++ b/bin/bindings.properties @@ -29,9 +29,6 @@ accumulo:com.yahoo.ycsb.db.accumulo.AccumuloClient aerospike:com.yahoo.ycsb.db.AerospikeClient asynchbase:com.yahoo.ycsb.db.AsyncHBaseClient basic:com.yahoo.ycsb.BasicDB -cassandra-7:com.yahoo.ycsb.db.CassandraClient7 -cassandra-8:com.yahoo.ycsb.db.CassandraClient8 -cassandra-10:com.yahoo.ycsb.db.CassandraClient10 cassandra-cql:com.yahoo.ycsb.db.CassandraCQLClient cassandra2-cql:com.yahoo.ycsb.db.CassandraCQLClient couchbase:com.yahoo.ycsb.db.CouchbaseClient diff --git a/bin/ycsb b/bin/ycsb index dad1fab2dcc3de17b2cd7bacaa86988e7689d7d3..292cc1e4094fc30fbb3a9a8ec1ea36b4324fb95f 100755 --- a/bin/ycsb +++ b/bin/ycsb @@ -55,9 +55,6 @@ DATABASES = { "aerospike" : "com.yahoo.ycsb.db.AerospikeClient", "asynchbase" : "com.yahoo.ycsb.db.AsyncHBaseClient", "basic" : "com.yahoo.ycsb.BasicDB", - "cassandra-7" : "com.yahoo.ycsb.db.CassandraClient7", - "cassandra-8" : "com.yahoo.ycsb.db.CassandraClient8", - "cassandra-10" : "com.yahoo.ycsb.db.CassandraClient10", "cassandra-cql": "com.yahoo.ycsb.db.CassandraCQLClient", "cassandra2-cql": "com.yahoo.ycsb.db.CassandraCQLClient", "couchbase" : "com.yahoo.ycsb.db.CouchbaseClient", @@ -243,11 +240,11 @@ def main(): # Classpath set up binding = args.database.split("-")[0] - # Deprecation message for the entire cassandra-binding - if binding == "cassandra": - warn("The 'cassandra-7', 'cassandra-8', 'cassandra-10', and " - "cassandra-cql' clients are deprecated. If you are using " - "Cassandra 2.X try using the 'cassandra2-cql' client instead.") + if binding == "cassandra2": + warn("The 'cassandra2-cql' client has been deprecated. It has been " + "renamed to simply 'cassandra-cql'. This alias will be removed" + " in the next YCSB release.") + binding = "cassandra" if binding == "couchbase": warn("The 'couchbase' client has been deprecated. If you are using " diff --git a/bin/ycsb.bat b/bin/ycsb.bat index bd609481b4e8cffd0f6830d95ec01ea837108359..f8ea7b4e25671b35586e1b9309993d610e69a7ac 100644 --- a/bin/ycsb.bat +++ b/bin/ycsb.bat @@ -117,6 +117,12 @@ GOTO confAdded SET CLASSPATH=%YCSB_HOME%\conf :confAdded +@REM Cassandra2 deprecation message +IF NOT "%BINDING_DIR%" == "cassandra2" GOTO notAliasCassandra +echo [WARN] The 'cassandra2-cql' client has been deprecated. It has been renamed to simply 'cassandra-cql'. This alias will be removed in the next YCSB release. +SET BINDING_DIR=cassandra +:notAliasCassandra + @REM Build classpath according to source checkout or release distribution IF EXIST "%YCSB_HOME%\pom.xml" GOTO gotSource @@ -178,11 +184,6 @@ FOR %%F IN (%YCSB_HOME%\%BINDING_DIR%\target\dependency\*.jar) DO ( :classpathComplete -@REM Cassandra deprecation message -IF NOT "%BINDING_DIR%" == "cassandra" GOTO notOldCassandra -echo [WARN] The 'cassandra-7', 'cassandra-8', 'cassandra-10', and cassandra-cql' clients are deprecated. If you are using Cassandra 2.X try using the 'cassandra2-cql' client instead. -:notOldCassandra - @REM Couchbase deprecation message IF NOT "%BINDING_DIR%" == "couchbase" GOTO notOldCouchbase echo [WARN] The 'couchbase' client is deprecated. If you are using Couchbase 4.0+ try using the 'couchbase2' client instead. diff --git a/bin/ycsb.sh b/bin/ycsb.sh index cc882602d8ab3c6c5b148a1e81b7fea37b22eb7b..d62c22a4e4a8659e25817d44624e8f4782ff49ff 100755 --- a/bin/ycsb.sh +++ b/bin/ycsb.sh @@ -133,6 +133,14 @@ else CLASSPATH="$CLASSPATH:$YCSB_HOME/conf" fi +# Cassandra2 deprecation message +if [ "${BINDING_DIR}" = "cassandra2" ] ; then + echo "[WARN] The 'cassandra2-cql' client has been deprecated. It has been \ +renamed to simply 'cassandra-cql'. This alias will be removed in the next \ +YCSB release." + BINDING_DIR="cassandra" +fi + # Build classpath # The "if" check after the "for" is because glob may just return the pattern # when no files are found. The "if" makes sure the file is really there. @@ -208,13 +216,6 @@ else done fi -# Cassandra deprecation message -if [ "$BINDING_DIR" = "cassandra" ] ; then - echo "[WARN] The 'cassandra-7', 'cassandra-8', 'cassandra-10', and \ -cassandra-cql' clients are deprecated. If you are using \ -Cassandra 2.X try using the 'cassandra2-cql' client instead." -fi - # Couchbase deprecation message if [ "${BINDING_DIR}" = "couchbase" ] ; then echo "[WARN] The 'couchbase' client is deprecated. If you are using \ diff --git a/cassandra/README.md b/cassandra/README.md index bff09f4e5b0256bbce154e71391d5d2d003082d1..bee44c8d8c41a36f459d0b38b54d35422dddac1a 100644 --- a/cassandra/README.md +++ b/cassandra/README.md @@ -14,59 +14,67 @@ implied. See the License for the specific language governing permissions and limitations under the License. See accompanying LICENSE file. --> -# THIS BINDING IS DEPRECATED ----------------------------- -Date of removal from YCSB: **March 2016** -Due to the low amount of use and support for older Cassandra lineages (0.X and 1.X), YCSB will not support clients for these versions either. +# Apache Cassandra 2.x CQL binding -For Cassandra 2.X use the ```cassandra2-cql``` client: https://github.com/brianfrankcooper/YCSB/tree/master/cassandra2. +Binding for [Apache Cassandra](http://cassandra.apache.org), using the CQL API +via the [DataStax +driver](http://docs.datastax.com/en/developer/java-driver/2.1/java-driver/whatsNew2.html). -# Cassandra (0.7, 0.8, 1.x) drivers for YCSB +To run against the (deprecated) Cassandra Thrift API, use the `cassandra-10` binding. -**For Cassandra 2 CQL support, use the `cassandra2-cql` binding. The Thrift drivers below are deprecated, and the CQL driver here does not support Cassandra 2.1+.** +## Creating a table for use with YCSB -There are three drivers in the Cassandra binding: +For keyspace `ycsb`, table `usertable`: -* `cassandra-7`: Cassandra 0.7 Thrift binding. -* `cassandra-8`: Cassandra 0.8 Thrift binding. -* `cassandra-10`: Cassandra 1.0+ Thrift binding. -* `cassandra-cql`: Cassandra CQL binding, for Cassandra 1.x to 2.0. See `cassandra2/README.md` for details on parameters. - -# `cassandra-10` - -## Creating a table - -Using `cassandra-cli`: - - create keyspace usertable with placement_strategy = 'org.apache.cassandra.locator.SimpleStrategy' and strategy_options = {replication_factor:1}; - - create column family data with column_type = 'Standard' and comparator = 'UTF8Type'; + cqlsh> create keyspace ycsb + WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor': 3 }; + cqlsh> USE ycsb; + cqlsh> create table usertable ( + y_id varchar primary key, + field0 varchar, + field1 varchar, + field2 varchar, + field3 varchar, + field4 varchar, + field5 varchar, + field6 varchar, + field7 varchar, + field8 varchar, + field9 varchar); **Note that `replication_factor` and consistency levels (below) will affect performance.** -## Configuration Parameters +## Cassandra Configuration Parameters - `hosts` (**required**) - Cassandra nodes to connect to. - No default. * `port` - - Thrift port for communicating with Cassandra cluster. - * Default is `9160`. + * CQL port for communicating with Cassandra cluster. + * Default is `9042`. + +- `cassandra.keyspace` + Keyspace name - must match the keyspace for the table created (see above). + See http://docs.datastax.com/en/cql/3.1/cql/cql_reference/create_keyspace_r.html for details. -- `cassandra.columnfamily` - - Column family name - must match the column family for the table created (see above). - - Default value is `data` + - Default value is `ycsb` - `cassandra.username` - `cassandra.password` - Optional user name and password for authentication. See http://docs.datastax.com/en/cassandra/2.0/cassandra/security/security_config_native_authenticate_t.html for details. * `cassandra.readconsistencylevel` -* `cassandra.scanconsistencylevel` * `cassandra.writeconsistencylevel` - - Default value is `ONE` + * Default value is `ONE` - Consistency level for reads and writes, respectively. See the [DataStax documentation](http://docs.datastax.com/en/cassandra/2.0/cassandra/dml/dml_config_consistency_c.html) for details. - - *Note that the default setting does not provide durability in the face of node failure. Changing this setting will affect observed performance.* See also `replication_factor`, above. + * *Note that the default setting does not provide durability in the face of node failure. Changing this setting will affect observed performance.* See also `replication_factor`, above. + +* `cassandra.maxconnections` +* `cassandra.coreconnections` + * Defaults for max and core connections can be found here: https://datastax.github.io/java-driver/2.1.8/features/pooling/#pool-size. Cassandra 2.0.X falls under protocol V2, Cassandra 2.1+ falls under protocol V3. +* `cassandra.connecttimeoutmillis` +* `cassandra.readtimeoutmillis` + * Defaults for connect and read timeouts can be found here: https://docs.datastax.com/en/drivers/java/2.0/com/datastax/driver/core/SocketOptions.html. \ No newline at end of file diff --git a/cassandra/pom.xml b/cassandra/pom.xml index 04db85d571dbef639ab832e4a36e6b83905fa529..67ae9fd04abb44ce6119dfa7a016ba303601133b 100644 --- a/cassandra/pom.xml +++ b/cassandra/pom.xml @@ -1,6 +1,7 @@ <?xml version="1.0" encoding="UTF-8"?> -<!-- -Copyright (c) 2012 - 2016 YCSB contributors. All rights reserved. + +<!-- +Copyright (c) 2012-2016 YCSB contributors. All rights reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You @@ -16,36 +17,65 @@ permissions and limitations under the License. See accompanying LICENSE file. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> - <groupId>com.yahoo.ycsb</groupId> - <artifactId>binding-parent</artifactId> - <version>0.11.0-SNAPSHOT</version> - <relativePath>../binding-parent</relativePath> + <groupId>com.yahoo.ycsb</groupId> + <artifactId>binding-parent</artifactId> + <version>0.11.0-SNAPSHOT</version> + <relativePath>../binding-parent</relativePath> </parent> - + <artifactId>cassandra-binding</artifactId> - <name>Cassandra DB Binding</name> + <name>Cassandra 2.1+ DB Binding</name> <packaging>jar</packaging> + <properties> + <!-- Skip tests by default. will be activated by jdk8 profile --> + <skipTests>true</skipTests> + </properties> + <dependencies> - <dependency> - <groupId>org.apache.cassandra</groupId> - <artifactId>cassandra-all</artifactId> - <version>${cassandra.version}</version> - </dependency> - <!-- CQL driver --> - <dependency> - <groupId>com.datastax.cassandra</groupId> - <artifactId>cassandra-driver-core</artifactId> - <version>${cassandra.cql.version}</version> - </dependency> - <dependency> - <groupId>com.yahoo.ycsb</groupId> - <artifactId>core</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> + <!-- CQL driver --> + <dependency> + <groupId>com.datastax.cassandra</groupId> + <artifactId>cassandra-driver-core</artifactId> + <version>${cassandra.cql.version}</version> + </dependency> + <dependency> + <groupId>com.yahoo.ycsb</groupId> + <artifactId>core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.cassandraunit</groupId> + <artifactId>cassandra-unit</artifactId> + <version>3.0.0.1</version> + <classifier>shaded</classifier> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.12</version> + <scope>test</scope> + </dependency> </dependencies> + + <profiles> + <!-- Cassandra 2.2+ requires JDK8 to run, so none of our tests + will work unless we're using jdk8. + --> + <profile> + <id>jdk8-tests</id> + <activation> + <jdk>1.8</jdk> + </activation> + <properties> + <skipTests>false</skipTests> + </properties> + </profile> + </profiles> </project> diff --git a/cassandra/src/main/java/com/yahoo/ycsb/db/CassandraCQLClient.java b/cassandra/src/main/java/com/yahoo/ycsb/db/CassandraCQLClient.java old mode 100755 new mode 100644 index eeaca7aeb3a7dc9280a5981cda65a7ac9383f1ee..d4dc8c7707b429f090561967d35acaa5052c4cac --- a/cassandra/src/main/java/com/yahoo/ycsb/db/CassandraCQLClient.java +++ b/cassandra/src/main/java/com/yahoo/ycsb/db/CassandraCQLClient.java @@ -1,5 +1,5 @@ /** - * Copyright (c) 2013 Yahoo! Inc. All rights reserved. + * Copyright (c) 2013-2015 YCSB contributors. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -17,24 +17,37 @@ */ package com.yahoo.ycsb.db; -import com.datastax.driver.core.*; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ColumnDefinitions; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.Host; +import com.datastax.driver.core.HostDistance; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.SimpleStatement; +import com.datastax.driver.core.Statement; import com.datastax.driver.core.querybuilder.Insert; import com.datastax.driver.core.querybuilder.QueryBuilder; import com.datastax.driver.core.querybuilder.Select; -import com.yahoo.ycsb.*; -import java.nio.ByteBuffer; +import com.yahoo.ycsb.ByteArrayByteIterator; +import com.yahoo.ycsb.ByteIterator; +import com.yahoo.ycsb.DB; +import com.yahoo.ycsb.DBException; +import com.yahoo.ycsb.Status; +import java.nio.ByteBuffer; +import java.util.HashMap; import java.util.Map; import java.util.Set; -import java.util.HashMap; import java.util.Vector; import java.util.concurrent.atomic.AtomicInteger; /** - * Tested with Cassandra 2.0, CQL client for YCSB framework + * Cassandra 2.x CQL client. * - * See {@code cassandra2} for a version compatible with Cassandra 2.1+. See - * {@code cassandra2/README.md} for details. + * See {@code cassandra2/README.md} for details. * * @author cmatser */ @@ -54,6 +67,7 @@ public class CassandraCQLClient extends DB { public static final String HOSTS_PROPERTY = "hosts"; public static final String PORT_PROPERTY = "port"; + public static final String PORT_PROPERTY_DEFAULT = "9042"; public static final String READ_CONSISTENCY_LEVEL_PROPERTY = "cassandra.readconsistencylevel"; @@ -62,6 +76,15 @@ public class CassandraCQLClient extends DB { "cassandra.writeconsistencylevel"; public static final String WRITE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT = "ONE"; + public static final String MAX_CONNECTIONS_PROPERTY = + "cassandra.maxconnections"; + public static final String CORE_CONNECTIONS_PROPERTY = + "cassandra.coreconnections"; + public static final String CONNECT_TIMEOUT_MILLIS_PROPERTY = + "cassandra.connecttimeoutmillis"; + public static final String READ_TIMEOUT_MILLIS_PROPERTY = + "cassandra.readtimeoutmillis"; + /** * Count the number of times initialized to teardown on the last * {@link #cleanup()}. @@ -101,12 +124,7 @@ public class CassandraCQLClient extends DB { HOSTS_PROPERTY)); } String[] hosts = host.split(","); - String port = getProperties().getProperty("port", "9042"); - if (port == null) { - throw new DBException(String.format( - "Required property \"%s\" missing for CassandraCQLClient", - PORT_PROPERTY)); - } + String port = getProperties().getProperty(PORT_PROPERTY, PORT_PROPERTY_DEFAULT); String username = getProperties().getProperty(USERNAME_PROPERTY); String password = getProperties().getProperty(PASSWORD_PROPERTY); @@ -121,7 +139,6 @@ public class CassandraCQLClient extends DB { getProperties().getProperty(WRITE_CONSISTENCY_LEVEL_PROPERTY, WRITE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT)); - // public void connect(String node) {} if ((username != null) && !username.isEmpty()) { cluster = Cluster.builder().withCredentials(username, password) .withPort(Integer.valueOf(port)).addContactPoints(hosts).build(); @@ -130,21 +147,38 @@ public class CassandraCQLClient extends DB { .addContactPoints(hosts).build(); } - // Update number of connections based on threads - int threadcount = - Integer.parseInt(getProperties().getProperty("threadcount", "1")); - cluster.getConfiguration().getPoolingOptions() - .setMaxConnectionsPerHost(HostDistance.LOCAL, threadcount); + String maxConnections = getProperties().getProperty( + MAX_CONNECTIONS_PROPERTY); + if (maxConnections != null) { + cluster.getConfiguration().getPoolingOptions() + .setMaxConnectionsPerHost(HostDistance.LOCAL, + Integer.valueOf(maxConnections)); + } + + String coreConnections = getProperties().getProperty( + CORE_CONNECTIONS_PROPERTY); + if (coreConnections != null) { + cluster.getConfiguration().getPoolingOptions() + .setCoreConnectionsPerHost(HostDistance.LOCAL, + Integer.valueOf(coreConnections)); + } - // Set connection timeout 3min (default is 5s) - cluster.getConfiguration().getSocketOptions() - .setConnectTimeoutMillis(3 * 60 * 1000); - // Set read (execute) timeout 3min (default is 12s) - cluster.getConfiguration().getSocketOptions() - .setReadTimeoutMillis(3 * 60 * 1000); + String connectTimoutMillis = getProperties().getProperty( + CONNECT_TIMEOUT_MILLIS_PROPERTY); + if (connectTimoutMillis != null) { + cluster.getConfiguration().getSocketOptions() + .setConnectTimeoutMillis(Integer.valueOf(connectTimoutMillis)); + } + + String readTimoutMillis = getProperties().getProperty( + READ_TIMEOUT_MILLIS_PROPERTY); + if (readTimoutMillis != null) { + cluster.getConfiguration().getSocketOptions() + .setReadTimeoutMillis(Integer.valueOf(readTimoutMillis)); + } Metadata metadata = cluster.getMetadata(); - System.out.printf("Connected to cluster: %s\n", + System.err.printf("Connected to cluster: %s\n", metadata.getClusterName()); for (Host discoveredHost : metadata.getAllHosts()) { @@ -167,8 +201,19 @@ public class CassandraCQLClient extends DB { */ @Override public void cleanup() throws DBException { - if (INIT_COUNT.decrementAndGet() <= 0) { - cluster.shutdown(); + synchronized (INIT_COUNT) { + final int curInitCount = INIT_COUNT.decrementAndGet(); + if (curInitCount <= 0) { + session.close(); + cluster.close(); + cluster = null; + session = null; + } + if (curInitCount < 0) { + // This should never happen. + throw new DBException( + String.format("initCount is negative: %d", curInitCount)); + } } } @@ -189,7 +234,6 @@ public class CassandraCQLClient extends DB { @Override public Status read(String table, String key, Set<String> fields, HashMap<String, ByteIterator> result) { - try { Statement stmt; Select.Builder selectBuilder; @@ -213,20 +257,21 @@ public class CassandraCQLClient extends DB { ResultSet rs = session.execute(stmt); + if (rs.isExhausted()) { + return Status.NOT_FOUND; + } + // Should be only 1 row - if (!rs.isExhausted()) { - Row row = rs.one(); - ColumnDefinitions cd = row.getColumnDefinitions(); + Row row = rs.one(); + ColumnDefinitions cd = row.getColumnDefinitions(); - for (ColumnDefinitions.Definition def : cd) { - ByteBuffer val = row.getBytesUnsafe(def.getName()); - if (val != null) { - result.put(def.getName(), new ByteArrayByteIterator(val.array())); - } else { - result.put(def.getName(), null); - } + for (ColumnDefinitions.Definition def : cd) { + ByteBuffer val = row.getBytesUnsafe(def.getName()); + if (val != null) { + result.put(def.getName(), new ByteArrayByteIterator(val.array())); + } else { + result.put(def.getName(), null); } - } return Status.OK; @@ -242,7 +287,7 @@ public class CassandraCQLClient extends DB { /** * Perform a range scan for a set of records in the database. Each field/value * pair from the result will be stored in a HashMap. - * + * * Cassandra CQL uses "token" method for range scan which doesn't always yield * intuitive results. * diff --git a/cassandra/src/main/java/com/yahoo/ycsb/db/CassandraClient10.java b/cassandra/src/main/java/com/yahoo/ycsb/db/CassandraClient10.java deleted file mode 100644 index b3d8e4a7d18824f13ed28a5dff051c69eb5e8ac4..0000000000000000000000000000000000000000 --- a/cassandra/src/main/java/com/yahoo/ycsb/db/CassandraClient10.java +++ /dev/null @@ -1,591 +0,0 @@ -/** - * Copyright (c) 2010 Yahoo! Inc. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. You - * may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. See accompanying - * LICENSE file. - */ - -package com.yahoo.ycsb.db; - -import com.yahoo.ycsb.ByteArrayByteIterator; -import com.yahoo.ycsb.ByteIterator; -import com.yahoo.ycsb.DB; -import com.yahoo.ycsb.DBException; -import com.yahoo.ycsb.Status; -import com.yahoo.ycsb.StringByteIterator; -import com.yahoo.ycsb.Utils; - -import org.apache.cassandra.thrift.AuthenticationRequest; -import org.apache.cassandra.thrift.Cassandra; -import org.apache.cassandra.thrift.Column; -import org.apache.cassandra.thrift.ColumnOrSuperColumn; -import org.apache.cassandra.thrift.ColumnParent; -import org.apache.cassandra.thrift.ColumnPath; -import org.apache.cassandra.thrift.ConsistencyLevel; -import org.apache.cassandra.thrift.KeyRange; -import org.apache.cassandra.thrift.KeySlice; -import org.apache.cassandra.thrift.Mutation; -import org.apache.cassandra.thrift.SlicePredicate; -import org.apache.cassandra.thrift.SliceRange; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.apache.thrift.protocol.TProtocol; -import org.apache.thrift.transport.TFramedTransport; -import org.apache.thrift.transport.TSocket; -import org.apache.thrift.transport.TTransport; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.Vector; - -//XXXX if we do replication, fix the consistency levels -/** - * Cassandra 1.0.6 client for YCSB framework. - */ -public class CassandraClient10 extends DB { - public static final int OK = 0; - public static final int ERROR = -1; - public static final ByteBuffer EMPTY_BYTE_BUFFER = - ByteBuffer.wrap(new byte[0]); - - public static final String CONNECTION_RETRY_PROPERTY = - "cassandra.connectionretries"; - public static final String CONNECTION_RETRY_PROPERTY_DEFAULT = "300"; - - public static final String OPERATION_RETRY_PROPERTY = - "cassandra.operationretries"; - public static final String OPERATION_RETRY_PROPERTY_DEFAULT = "300"; - - public static final String USERNAME_PROPERTY = "cassandra.username"; - public static final String PASSWORD_PROPERTY = "cassandra.password"; - - public static final String COLUMN_FAMILY_PROPERTY = "cassandra.columnfamily"; - public static final String COLUMN_FAMILY_PROPERTY_DEFAULT = "data"; - - public static final String READ_CONSISTENCY_LEVEL_PROPERTY = - "cassandra.readconsistencylevel"; - public static final String READ_CONSISTENCY_LEVEL_PROPERTY_DEFAULT = "ONE"; - - public static final String WRITE_CONSISTENCY_LEVEL_PROPERTY = - "cassandra.writeconsistencylevel"; - public static final String WRITE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT = "ONE"; - - public static final String SCAN_CONSISTENCY_LEVEL_PROPERTY = - "cassandra.scanconsistencylevel"; - public static final String SCAN_CONSISTENCY_LEVEL_PROPERTY_DEFAULT = "ONE"; - - public static final String DELETE_CONSISTENCY_LEVEL_PROPERTY = - "cassandra.deleteconsistencylevel"; - public static final String DELETE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT = "ONE"; - - private int connectionRetries; - private int operationRetries; - private String columnFamily; - - private TTransport tr; - private Cassandra.Client client; - - private boolean debug = false; - - private String tableName = ""; - private Exception errorexception = null; - - private List<Mutation> mutations = new ArrayList<Mutation>(); - private Map<String, List<Mutation>> mutationMap = - new HashMap<String, List<Mutation>>(); - private Map<ByteBuffer, Map<String, List<Mutation>>> record = - new HashMap<ByteBuffer, Map<String, List<Mutation>>>(); - - private ColumnParent parent; - - private ConsistencyLevel readConsistencyLevel = ConsistencyLevel.ONE; - private ConsistencyLevel writeConsistencyLevel = ConsistencyLevel.ONE; - private ConsistencyLevel scanConsistencyLevel = ConsistencyLevel.ONE; - private ConsistencyLevel deleteConsistencyLevel = ConsistencyLevel.ONE; - - /** - * Initialize any state for this DB. Called once per DB instance; there is one - * DB instance per client thread. - */ - public void init() throws DBException { - String hosts = getProperties().getProperty("hosts"); - if (hosts == null) { - throw new DBException( - "Required property \"hosts\" missing for CassandraClient"); - } - - columnFamily = getProperties().getProperty(COLUMN_FAMILY_PROPERTY, - COLUMN_FAMILY_PROPERTY_DEFAULT); - parent = new ColumnParent(columnFamily); - - connectionRetries = - Integer.parseInt(getProperties().getProperty(CONNECTION_RETRY_PROPERTY, - CONNECTION_RETRY_PROPERTY_DEFAULT)); - operationRetries = - Integer.parseInt(getProperties().getProperty(OPERATION_RETRY_PROPERTY, - OPERATION_RETRY_PROPERTY_DEFAULT)); - - String username = getProperties().getProperty(USERNAME_PROPERTY); - String password = getProperties().getProperty(PASSWORD_PROPERTY); - - readConsistencyLevel = ConsistencyLevel - .valueOf(getProperties().getProperty(READ_CONSISTENCY_LEVEL_PROPERTY, - READ_CONSISTENCY_LEVEL_PROPERTY_DEFAULT)); - writeConsistencyLevel = ConsistencyLevel - .valueOf(getProperties().getProperty(WRITE_CONSISTENCY_LEVEL_PROPERTY, - WRITE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT)); - scanConsistencyLevel = ConsistencyLevel - .valueOf(getProperties().getProperty(SCAN_CONSISTENCY_LEVEL_PROPERTY, - SCAN_CONSISTENCY_LEVEL_PROPERTY_DEFAULT)); - deleteConsistencyLevel = ConsistencyLevel - .valueOf(getProperties().getProperty(DELETE_CONSISTENCY_LEVEL_PROPERTY, - DELETE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT)); - - debug = Boolean.parseBoolean(getProperties().getProperty("debug", "false")); - - String[] allhosts = hosts.split(","); - String myhost = allhosts[Utils.random().nextInt(allhosts.length)]; - - Exception connectexception = null; - - for (int retry = 0; retry < connectionRetries; retry++) { - tr = new TFramedTransport(new TSocket(myhost, - Integer.parseInt(getProperties().getProperty("port", "9160")))); - TProtocol proto = new TBinaryProtocol(tr); - client = new Cassandra.Client(proto); - try { - tr.open(); - connectexception = null; - break; - } catch (Exception e) { - connectexception = e; - } - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - if (connectexception != null) { - System.err.println("Unable to connect to " + myhost + " after " - + connectionRetries + " tries"); - throw new DBException(connectexception); - } - - if (username != null && password != null) { - Map<String, String> cred = new HashMap<String, String>(); - cred.put("username", username); - cred.put("password", password); - AuthenticationRequest req = new AuthenticationRequest(cred); - try { - client.login(req); - } catch (Exception e) { - throw new DBException(e); - } - } - } - - /** - * Cleanup any state for this DB. Called once per DB instance; there is one DB - * instance per client thread. - */ - public void cleanup() throws DBException { - tr.close(); - } - - /** - * Read a record from the database. Each field/value pair from the result will - * be stored in a HashMap. - * - * @param table - * The name of the table - * @param key - * The record key of the record to read. - * @param fields - * The list of fields to read, or null for all of them - * @param result - * A HashMap of field/value pairs for the result - * @return Zero on success, a non-zero error code on error - */ - public Status read(String table, String key, Set<String> fields, - HashMap<String, ByteIterator> result) { - if (!tableName.equals(table)) { - try { - client.set_keyspace(table); - tableName = table; - } catch (Exception e) { - e.printStackTrace(); - e.printStackTrace(System.out); - return Status.ERROR; - } - } - - for (int i = 0; i < operationRetries; i++) { - - try { - SlicePredicate predicate; - if (fields == null) { - predicate = new SlicePredicate().setSlice_range(new SliceRange( - EMPTY_BYTE_BUFFER, EMPTY_BYTE_BUFFER, false, 1000000)); - - } else { - ArrayList<ByteBuffer> fieldlist = - new ArrayList<ByteBuffer>(fields.size()); - for (String s : fields) { - fieldlist.add(ByteBuffer.wrap(s.getBytes("UTF-8"))); - } - - predicate = new SlicePredicate().setColumn_names(fieldlist); - } - - List<ColumnOrSuperColumn> results = - client.get_slice(ByteBuffer.wrap(key.getBytes("UTF-8")), parent, - predicate, readConsistencyLevel); - - if (debug) { - System.out.print("Reading key: " + key); - } - - Column column; - String name; - ByteIterator value; - for (ColumnOrSuperColumn oneresult : results) { - - column = oneresult.column; - name = new String(column.name.array(), - column.name.position() + column.name.arrayOffset(), - column.name.remaining()); - value = new ByteArrayByteIterator(column.value.array(), - column.value.position() + column.value.arrayOffset(), - column.value.remaining()); - - result.put(name, value); - - if (debug) { - System.out.print("(" + name + "=" + value + ")"); - } - } - - if (debug) { - System.out.println(); - System.out - .println("ConsistencyLevel=" + readConsistencyLevel.toString()); - } - - return Status.OK; - } catch (Exception e) { - errorexception = e; - } - - try { - Thread.sleep(500); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - errorexception.printStackTrace(); - errorexception.printStackTrace(System.out); - return Status.ERROR; - - } - - /** - * Perform a range scan for a set of records in the database. Each field/value - * pair from the result will be stored in a HashMap. - * - * @param table - * The name of the table - * @param startkey - * The record key of the first record to read. - * @param recordcount - * The number of records to read - * @param fields - * The list of fields to read, or null for all of them - * @param result - * A Vector of HashMaps, where each HashMap is a set field/value - * pairs for one record - * @return Zero on success, a non-zero error code on error - */ - public Status scan(String table, String startkey, int recordcount, - Set<String> fields, Vector<HashMap<String, ByteIterator>> result) { - if (!tableName.equals(table)) { - try { - client.set_keyspace(table); - tableName = table; - } catch (Exception e) { - e.printStackTrace(); - e.printStackTrace(System.out); - return Status.ERROR; - } - } - - for (int i = 0; i < operationRetries; i++) { - - try { - SlicePredicate predicate; - if (fields == null) { - predicate = new SlicePredicate().setSlice_range(new SliceRange( - EMPTY_BYTE_BUFFER, EMPTY_BYTE_BUFFER, false, 1000000)); - - } else { - ArrayList<ByteBuffer> fieldlist = - new ArrayList<ByteBuffer>(fields.size()); - for (String s : fields) { - fieldlist.add(ByteBuffer.wrap(s.getBytes("UTF-8"))); - } - - predicate = new SlicePredicate().setColumn_names(fieldlist); - } - - KeyRange kr = new KeyRange().setStart_key(startkey.getBytes("UTF-8")) - .setEnd_key(new byte[] {}).setCount(recordcount); - - List<KeySlice> results = client.get_range_slices(parent, predicate, kr, - scanConsistencyLevel); - - if (debug) { - System.out.println("Scanning startkey: " + startkey); - } - - HashMap<String, ByteIterator> tuple; - for (KeySlice oneresult : results) { - tuple = new HashMap<String, ByteIterator>(); - - Column column; - String name; - ByteIterator value; - for (ColumnOrSuperColumn onecol : oneresult.columns) { - column = onecol.column; - name = new String(column.name.array(), - column.name.position() + column.name.arrayOffset(), - column.name.remaining()); - value = new ByteArrayByteIterator(column.value.array(), - column.value.position() + column.value.arrayOffset(), - column.value.remaining()); - - tuple.put(name, value); - - if (debug) { - System.out.print("(" + name + "=" + value + ")"); - } - } - - result.add(tuple); - if (debug) { - System.out.println(); - System.out - .println("ConsistencyLevel=" + scanConsistencyLevel.toString()); - } - } - - return Status.OK; - } catch (Exception e) { - errorexception = e; - } - try { - Thread.sleep(500); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - errorexception.printStackTrace(); - errorexception.printStackTrace(System.out); - return Status.ERROR; - } - - /** - * Update a record in the database. Any field/value pairs in the specified - * values HashMap will be written into the record with the specified record - * key, overwriting any existing values with the same field name. - * - * @param table - * The name of the table - * @param key - * The record key of the record to write. - * @param values - * A HashMap of field/value pairs to update in the record - * @return Zero on success, a non-zero error code on error - */ - public Status update(String table, String key, - HashMap<String, ByteIterator> values) { - return insert(table, key, values); - } - - /** - * Insert a record in the database. Any field/value pairs in the specified - * values HashMap will be written into the record with the specified record - * key. - * - * @param table - * The name of the table - * @param key - * The record key of the record to insert. - * @param values - * A HashMap of field/value pairs to insert in the record - * @return Zero on success, a non-zero error code on error - */ - public Status insert(String table, String key, - HashMap<String, ByteIterator> values) { - if (!tableName.equals(table)) { - try { - client.set_keyspace(table); - tableName = table; - } catch (Exception e) { - e.printStackTrace(); - e.printStackTrace(System.out); - return Status.ERROR; - } - } - - for (int i = 0; i < operationRetries; i++) { - mutations.clear(); - mutationMap.clear(); - record.clear(); - - if (debug) { - System.out.println("Inserting key: " + key); - } - - try { - ByteBuffer wrappedKey = ByteBuffer.wrap(key.getBytes("UTF-8")); - - Column col; - ColumnOrSuperColumn column; - for (Map.Entry<String, ByteIterator> entry : values.entrySet()) { - col = new Column(); - col.setName(ByteBuffer.wrap(entry.getKey().getBytes("UTF-8"))); - col.setValue(ByteBuffer.wrap(entry.getValue().toArray())); - col.setTimestamp(System.currentTimeMillis()); - - column = new ColumnOrSuperColumn(); - column.setColumn(col); - - mutations.add(new Mutation().setColumn_or_supercolumn(column)); - } - - mutationMap.put(columnFamily, mutations); - record.put(wrappedKey, mutationMap); - - client.batch_mutate(record, writeConsistencyLevel); - - if (debug) { - System.out - .println("ConsistencyLevel=" + writeConsistencyLevel.toString()); - } - - return Status.OK; - } catch (Exception e) { - errorexception = e; - } - try { - Thread.sleep(500); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - - errorexception.printStackTrace(); - errorexception.printStackTrace(System.out); - return Status.ERROR; - } - - /** - * Delete a record from the database. - * - * @param table - * The name of the table - * @param key - * The record key of the record to delete. - * @return Zero on success, a non-zero error code on error - */ - public Status delete(String table, String key) { - if (!tableName.equals(table)) { - try { - client.set_keyspace(table); - tableName = table; - } catch (Exception e) { - e.printStackTrace(); - e.printStackTrace(System.out); - return Status.ERROR; - } - } - - for (int i = 0; i < operationRetries; i++) { - try { - client.remove(ByteBuffer.wrap(key.getBytes("UTF-8")), - new ColumnPath(columnFamily), System.currentTimeMillis(), - deleteConsistencyLevel); - - if (debug) { - System.out.println("Delete key: " + key); - System.out - .println("ConsistencyLevel=" + deleteConsistencyLevel.toString()); - } - - return Status.OK; - } catch (Exception e) { - errorexception = e; - } - try { - Thread.sleep(500); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - errorexception.printStackTrace(); - errorexception.printStackTrace(System.out); - return Status.ERROR; - } - - public static void main(String[] args) { - CassandraClient10 cli = new CassandraClient10(); - - Properties props = new Properties(); - - props.setProperty("hosts", args[0]); - cli.setProperties(props); - - try { - cli.init(); - } catch (Exception e) { - e.printStackTrace(); - System.exit(0); - } - - HashMap<String, ByteIterator> vals = new HashMap<String, ByteIterator>(); - vals.put("age", new StringByteIterator("57")); - vals.put("middlename", new StringByteIterator("bradley")); - vals.put("favoritecolor", new StringByteIterator("blue")); - Status res = cli.insert("usertable", "BrianFrankCooper", vals); - System.out.println("Result of insert: " + res); - - HashMap<String, ByteIterator> result = new HashMap<String, ByteIterator>(); - HashSet<String> fields = new HashSet<String>(); - fields.add("middlename"); - fields.add("age"); - fields.add("favoritecolor"); - res = cli.read("usertable", "BrianFrankCooper", null, result); - System.out.println("Result of read: " + res); - for (Map.Entry<String, ByteIterator> entry : result.entrySet()) { - System.out.println("[" + entry.getKey() + "]=[" + entry.getValue() + "]"); - } - - res = cli.delete("usertable", "BrianFrankCooper"); - System.out.println("Result of delete: " + res); - } -} diff --git a/cassandra/src/main/java/com/yahoo/ycsb/db/CassandraClient7.java b/cassandra/src/main/java/com/yahoo/ycsb/db/CassandraClient7.java deleted file mode 100644 index f2075e7f77780a8a609d03af84aa127ad7d5419b..0000000000000000000000000000000000000000 --- a/cassandra/src/main/java/com/yahoo/ycsb/db/CassandraClient7.java +++ /dev/null @@ -1,549 +0,0 @@ -/** - * Copyright (c) 2010 Yahoo! Inc. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. You - * may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. See accompanying - * LICENSE file. - */ - -package com.yahoo.ycsb.db; - -import com.yahoo.ycsb.ByteArrayByteIterator; -import com.yahoo.ycsb.ByteIterator; -import com.yahoo.ycsb.DB; -import com.yahoo.ycsb.DBException; -import com.yahoo.ycsb.Status; -import com.yahoo.ycsb.StringByteIterator; -import com.yahoo.ycsb.Utils; - -import org.apache.cassandra.thrift.AuthenticationRequest; -import org.apache.cassandra.thrift.Cassandra; -import org.apache.cassandra.thrift.Column; -import org.apache.cassandra.thrift.ColumnOrSuperColumn; -import org.apache.cassandra.thrift.ColumnParent; -import org.apache.cassandra.thrift.ColumnPath; -import org.apache.cassandra.thrift.ConsistencyLevel; -import org.apache.cassandra.thrift.KeyRange; -import org.apache.cassandra.thrift.KeySlice; -import org.apache.cassandra.thrift.Mutation; -import org.apache.cassandra.thrift.SlicePredicate; -import org.apache.cassandra.thrift.SliceRange; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.apache.thrift.protocol.TProtocol; -import org.apache.thrift.transport.TFramedTransport; -import org.apache.thrift.transport.TSocket; -import org.apache.thrift.transport.TTransport; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.Vector; - -//XXXX if we do replication, fix the consistency levels -/** - * Cassandra 0.7 client for YCSB framework. - */ -public class CassandraClient7 extends DB { - public static final ByteBuffer EMPTY_BYTE_BUFFER = - ByteBuffer.wrap(new byte[0]); - - public static final String CONNECTION_RETRY_PROPERTY = - "cassandra.connectionretries"; - public static final String CONNECTION_RETRY_PROPERTY_DEFAULT = "300"; - - public static final String OPERATION_RETRY_PROPERTY = - "cassandra.operationretries"; - public static final String OPERATION_RETRY_PROPERTY_DEFAULT = "300"; - - public static final String USERNAME_PROPERTY = "cassandra.username"; - public static final String PASSWORD_PROPERTY = "cassandra.password"; - - public static final String COLUMN_FAMILY_PROPERTY = "cassandra.columnfamily"; - public static final String COLUMN_FAMILY_PROPERTY_DEFAULT = "data"; - - private int connectionRetries; - private int operationRetries; - private String columnFamily; - - private TTransport tr; - private Cassandra.Client client; - - private boolean debug = false; - - private String tableName = ""; - private Exception errorexception = null; - - private List<Mutation> mutations = new ArrayList<Mutation>(); - private Map<String, List<Mutation>> mutationMap = - new HashMap<String, List<Mutation>>(); - private Map<ByteBuffer, Map<String, List<Mutation>>> record = - new HashMap<ByteBuffer, Map<String, List<Mutation>>>(); - - private ColumnParent parent; - - /** - * Initialize any state for this DB. Called once per DB instance; there is one - * DB instance per client thread. - */ - public void init() throws DBException { - String hosts = getProperties().getProperty("hosts"); - if (hosts == null) { - throw new DBException( - "Required property \"hosts\" missing for CassandraClient"); - } - - columnFamily = getProperties().getProperty(COLUMN_FAMILY_PROPERTY, - COLUMN_FAMILY_PROPERTY_DEFAULT); - parent = new ColumnParent(columnFamily); - - connectionRetries = - Integer.parseInt(getProperties().getProperty(CONNECTION_RETRY_PROPERTY, - CONNECTION_RETRY_PROPERTY_DEFAULT)); - operationRetries = - Integer.parseInt(getProperties().getProperty(OPERATION_RETRY_PROPERTY, - OPERATION_RETRY_PROPERTY_DEFAULT)); - - String username = getProperties().getProperty(USERNAME_PROPERTY); - String password = getProperties().getProperty(PASSWORD_PROPERTY); - - debug = Boolean.parseBoolean(getProperties().getProperty("debug", "false")); - - String[] allhosts = hosts.split(","); - String myhost = allhosts[Utils.random().nextInt(allhosts.length)]; - - Exception connectexception = null; - - for (int retry = 0; retry < connectionRetries; retry++) { - tr = new TFramedTransport(new TSocket(myhost, 9160)); - TProtocol proto = new TBinaryProtocol(tr); - client = new Cassandra.Client(proto); - try { - tr.open(); - connectexception = null; - break; - } catch (Exception e) { - connectexception = e; - } - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - if (connectexception != null) { - System.err.println("Unable to connect to " + myhost + " after " - + connectionRetries + " tries"); - System.out.println("Unable to connect to " + myhost + " after " - + connectionRetries + " tries"); - throw new DBException(connectexception); - } - - if (username != null && password != null) { - Map<String, String> cred = new HashMap<String, String>(); - cred.put("username", username); - cred.put("password", password); - AuthenticationRequest req = new AuthenticationRequest(cred); - try { - client.login(req); - } catch (Exception e) { - throw new DBException(e); - } - } - } - - /** - * Cleanup any state for this DB. Called once per DB instance; there is one DB - * instance per client thread. - */ - public void cleanup() throws DBException { - tr.close(); - } - - /** - * Read a record from the database. Each field/value pair from the result will - * be stored in a HashMap. - * - * @param table - * The name of the table - * @param key - * The record key of the record to read. - * @param fields - * The list of fields to read, or null for all of them - * @param result - * A HashMap of field/value pairs for the result - * @return Zero on success, a non-zero error code on error - */ - public Status read(String table, String key, Set<String> fields, - HashMap<String, ByteIterator> result) { - if (!tableName.equals(table)) { - try { - client.set_keyspace(table); - tableName = table; - } catch (Exception e) { - e.printStackTrace(); - e.printStackTrace(System.out); - return Status.ERROR; - } - } - - for (int i = 0; i < operationRetries; i++) { - - try { - SlicePredicate predicate; - if (fields == null) { - SliceRange range = new SliceRange(EMPTY_BYTE_BUFFER, - EMPTY_BYTE_BUFFER, false, 1000000); - - predicate = new SlicePredicate().setSlice_range(range); - - } else { - ArrayList<ByteBuffer> fieldlist = - new ArrayList<ByteBuffer>(fields.size()); - for (String s : fields) { - fieldlist.add(ByteBuffer.wrap(s.getBytes("UTF-8"))); - } - - predicate = new SlicePredicate().setColumn_names(fieldlist); - } - - List<ColumnOrSuperColumn> results = - client.get_slice(ByteBuffer.wrap(key.getBytes("UTF-8")), parent, - predicate, ConsistencyLevel.ONE); - - if (debug) { - System.out.print("Reading key: " + key); - } - - Column column; - String name; - ByteIterator value; - for (ColumnOrSuperColumn oneresult : results) { - - column = oneresult.column; - name = new String(column.name.array(), - column.name.position() + column.name.arrayOffset(), - column.name.remaining()); - value = new ByteArrayByteIterator(column.value.array(), - column.value.position() + column.value.arrayOffset(), - column.value.remaining()); - - result.put(name, value); - - if (debug) { - System.out.print("(" + name + "=" + value + ")"); - } - } - - if (debug) { - System.out.println(); - } - - return Status.OK; - } catch (Exception e) { - errorexception = e; - } - - try { - Thread.sleep(500); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - errorexception.printStackTrace(); - errorexception.printStackTrace(System.out); - return Status.ERROR; - - } - - /** - * Perform a range scan for a set of records in the database. Each field/value - * pair from the result will be stored in a HashMap. - * - * @param table - * The name of the table - * @param startkey - * The record key of the first record to read. - * @param recordcount - * The number of records to read - * @param fields - * The list of fields to read, or null for all of them - * @param result - * A Vector of HashMaps, where each HashMap is a set field/value - * pairs for one record - * @return Zero on success, a non-zero error code on error - */ - public Status scan(String table, String startkey, int recordcount, - Set<String> fields, Vector<HashMap<String, ByteIterator>> result) { - if (!tableName.equals(table)) { - try { - client.set_keyspace(table); - tableName = table; - } catch (Exception e) { - e.printStackTrace(); - e.printStackTrace(System.out); - return Status.ERROR; - } - } - - for (int i = 0; i < operationRetries; i++) { - - try { - SlicePredicate predicate; - if (fields == null) { - SliceRange range = new SliceRange(EMPTY_BYTE_BUFFER, - EMPTY_BYTE_BUFFER, false, 1000000); - - predicate = new SlicePredicate().setSlice_range(range); - - } else { - ArrayList<ByteBuffer> fieldlist = - new ArrayList<ByteBuffer>(fields.size()); - for (String s : fields) { - fieldlist.add(ByteBuffer.wrap(s.getBytes("UTF-8"))); - } - - predicate = new SlicePredicate().setColumn_names(fieldlist); - } - - KeyRange kr = new KeyRange().setStart_key(startkey.getBytes("UTF-8")) - .setEnd_key(new byte[] {}).setCount(recordcount); - - List<KeySlice> results = client.get_range_slices(parent, predicate, kr, - ConsistencyLevel.ONE); - - if (debug) { - System.out.println("Scanning startkey: " + startkey); - } - - HashMap<String, ByteIterator> tuple; - for (KeySlice oneresult : results) { - tuple = new HashMap<String, ByteIterator>(); - - Column column; - String name; - ByteIterator value; - for (ColumnOrSuperColumn onecol : oneresult.columns) { - column = onecol.column; - name = new String(column.name.array(), - column.name.position() + column.name.arrayOffset(), - column.name.remaining()); - value = new ByteArrayByteIterator(column.value.array(), - column.value.position() + column.value.arrayOffset(), - column.value.remaining()); - - tuple.put(name, value); - - if (debug) { - System.out.print("(" + name + "=" + value + ")"); - } - } - - result.add(tuple); - if (debug) { - System.out.println(); - } - } - - return Status.OK; - } catch (Exception e) { - errorexception = e; - } - try { - Thread.sleep(500); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - errorexception.printStackTrace(); - errorexception.printStackTrace(System.out); - return Status.ERROR; - } - - /** - * Update a record in the database. Any field/value pairs in the specified - * values HashMap will be written into the record with the specified record - * key, overwriting any existing values with the same field name. - * - * @param table - * The name of the table - * @param key - * The record key of the record to write. - * @param values - * A HashMap of field/value pairs to update in the record - * @return Zero on success, a non-zero error code on error - */ - public Status update(String table, String key, - HashMap<String, ByteIterator> values) { - return insert(table, key, values); - } - - /** - * Insert a record in the database. Any field/value pairs in the specified - * values HashMap will be written into the record with the specified record - * key. - * - * @param table - * The name of the table - * @param key - * The record key of the record to insert. - * @param values - * A HashMap of field/value pairs to insert in the record - * @return Zero on success, a non-zero error code on error - */ - public Status insert(String table, String key, - HashMap<String, ByteIterator> values) { - if (!tableName.equals(table)) { - try { - client.set_keyspace(table); - tableName = table; - } catch (Exception e) { - e.printStackTrace(); - e.printStackTrace(System.out); - return Status.ERROR; - } - } - - for (int i = 0; i < operationRetries; i++) { - mutations.clear(); - mutationMap.clear(); - record.clear(); - - if (debug) { - System.out.println("Inserting key: " + key); - } - - try { - ByteBuffer wrappedKey = ByteBuffer.wrap(key.getBytes("UTF-8")); - - Column col; - ColumnOrSuperColumn column; - for (Map.Entry<String, ByteIterator> entry : values.entrySet()) { - col = new Column(); - col.setName(ByteBuffer.wrap(entry.getKey().getBytes("UTF-8"))); - col.setValue(ByteBuffer.wrap(entry.getValue().toArray())); - col.setTimestamp(System.currentTimeMillis()); - - column = new ColumnOrSuperColumn(); - column.setColumn(col); - - mutations.add(new Mutation().setColumn_or_supercolumn(column)); - } - - mutationMap.put(columnFamily, mutations); - record.put(wrappedKey, mutationMap); - - client.batch_mutate(record, ConsistencyLevel.ONE); - - return Status.OK; - } catch (Exception e) { - errorexception = e; - } - try { - Thread.sleep(500); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - - errorexception.printStackTrace(); - errorexception.printStackTrace(System.out); - return Status.ERROR; - } - - /** - * Delete a record from the database. - * - * @param table - * The name of the table - * @param key - * The record key of the record to delete. - * @return Zero on success, a non-zero error code on error - */ - public Status delete(String table, String key) { - if (!tableName.equals(table)) { - try { - client.set_keyspace(table); - tableName = table; - } catch (Exception e) { - e.printStackTrace(); - e.printStackTrace(System.out); - return Status.ERROR; - } - } - - for (int i = 0; i < operationRetries; i++) { - try { - client.remove(ByteBuffer.wrap(key.getBytes("UTF-8")), - new ColumnPath(columnFamily), System.currentTimeMillis(), - ConsistencyLevel.ONE); - - if (debug) { - System.out.println("Delete key: " + key); - } - - return Status.OK; - } catch (Exception e) { - errorexception = e; - } - try { - Thread.sleep(500); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - errorexception.printStackTrace(); - errorexception.printStackTrace(System.out); - return Status.ERROR; - } - - public static void main(String[] args) { - CassandraClient7 cli = new CassandraClient7(); - - Properties props = new Properties(); - - props.setProperty("hosts", args[0]); - cli.setProperties(props); - - try { - cli.init(); - } catch (Exception e) { - e.printStackTrace(); - System.exit(0); - } - - HashMap<String, ByteIterator> vals = new HashMap<String, ByteIterator>(); - vals.put("age", new StringByteIterator("57")); - vals.put("middlename", new StringByteIterator("bradley")); - vals.put("favoritecolor", new StringByteIterator("blue")); - Status res = cli.insert("usertable", "BrianFrankCooper", vals); - System.out.println("Result of insert: " + res.getName()); - - HashMap<String, ByteIterator> result = new HashMap<String, ByteIterator>(); - HashSet<String> fields = new HashSet<String>(); - fields.add("middlename"); - fields.add("age"); - fields.add("favoritecolor"); - res = cli.read("usertable", "BrianFrankCooper", null, result); - System.out.println("Result of read: " + res.getName()); - for (Map.Entry<String, ByteIterator> entry : result.entrySet()) { - System.out.println("[" + entry.getKey() + "]=[" + entry.getValue() + "]"); - } - - res = cli.delete("usertable", "BrianFrankCooper"); - System.out.println("Result of delete: " + res.getName()); - } -} diff --git a/cassandra/src/main/java/com/yahoo/ycsb/db/CassandraClient8.java b/cassandra/src/main/java/com/yahoo/ycsb/db/CassandraClient8.java deleted file mode 100644 index ca72c339da797073df4da5e0e9d1af42c612a360..0000000000000000000000000000000000000000 --- a/cassandra/src/main/java/com/yahoo/ycsb/db/CassandraClient8.java +++ /dev/null @@ -1,528 +0,0 @@ -/** - * Copyright (c) 2010 Yahoo! Inc. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. You - * may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. See accompanying - * LICENSE file. - */ - -package com.yahoo.ycsb.db; - -import com.yahoo.ycsb.*; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Vector; -import java.util.Properties; -import java.nio.ByteBuffer; - -import org.apache.thrift.transport.TTransport; -import org.apache.thrift.transport.TFramedTransport; -import org.apache.thrift.transport.TSocket; -import org.apache.thrift.protocol.TProtocol; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.apache.cassandra.thrift.*; - -//XXXX if we do replication, fix the consistency levels -/** - * Cassandra 0.8 client for YCSB framework. - */ -public class CassandraClient8 extends DB { - public static final ByteBuffer EMPTY_BYTE_BUFFER = - ByteBuffer.wrap(new byte[0]); - - public static final String CONNECTION_RETRY_PROPERTY = - "cassandra.connectionretries"; - public static final String CONNECTION_RETRY_PROPERTY_DEFAULT = "300"; - - public static final String OPERATION_RETRY_PROPERTY = - "cassandra.operationretries"; - public static final String OPERATION_RETRY_PROPERTY_DEFAULT = "300"; - - public static final String USERNAME_PROPERTY = "cassandra.username"; - public static final String PASSWORD_PROPERTY = "cassandra.password"; - - public static final String COLUMN_FAMILY_PROPERTY = "cassandra.columnfamily"; - public static final String COLUMN_FAMILY_PROPERTY_DEFAULT = "data"; - - private int connectionRetries; - private int operationRetries; - private String columnFamily; - - private TTransport tr; - private Cassandra.Client client; - - private boolean debug = false; - - private String tableName = ""; - private Exception errorexception = null; - - private List<Mutation> mutations = new ArrayList<Mutation>(); - private Map<String, List<Mutation>> mutationMap = - new HashMap<String, List<Mutation>>(); - private Map<ByteBuffer, Map<String, List<Mutation>>> record = - new HashMap<ByteBuffer, Map<String, List<Mutation>>>(); - - private ColumnParent parent; - - /** - * Initialize any state for this DB. Called once per DB instance; there is one - * DB instance per client thread. - */ - public void init() throws DBException { - String hosts = getProperties().getProperty("hosts"); - if (hosts == null) { - throw new DBException( - "Required property \"hosts\" missing for CassandraClient"); - } - - columnFamily = getProperties().getProperty(COLUMN_FAMILY_PROPERTY, - COLUMN_FAMILY_PROPERTY_DEFAULT); - parent = new ColumnParent(columnFamily); - - connectionRetries = - Integer.parseInt(getProperties().getProperty(CONNECTION_RETRY_PROPERTY, - CONNECTION_RETRY_PROPERTY_DEFAULT)); - operationRetries = - Integer.parseInt(getProperties().getProperty(OPERATION_RETRY_PROPERTY, - OPERATION_RETRY_PROPERTY_DEFAULT)); - - String username = getProperties().getProperty(USERNAME_PROPERTY); - String password = getProperties().getProperty(PASSWORD_PROPERTY); - - debug = Boolean.parseBoolean(getProperties().getProperty("debug", "false")); - - String[] allhosts = hosts.split(","); - String myhost = allhosts[Utils.random().nextInt(allhosts.length)]; - - Exception connectexception = null; - - for (int retry = 0; retry < connectionRetries; retry++) { - tr = new TFramedTransport(new TSocket(myhost, 9160)); - TProtocol proto = new TBinaryProtocol(tr); - client = new Cassandra.Client(proto); - try { - tr.open(); - connectexception = null; - break; - } catch (Exception e) { - connectexception = e; - } - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - if (connectexception != null) { - System.err.println("Unable to connect to " + myhost + " after " - + connectionRetries + " tries"); - System.out.println("Unable to connect to " + myhost + " after " - + connectionRetries + " tries"); - throw new DBException(connectexception); - } - - if (username != null && password != null) { - Map<String, String> cred = new HashMap<String, String>(); - cred.put("username", username); - cred.put("password", password); - AuthenticationRequest req = new AuthenticationRequest(cred); - try { - client.login(req); - } catch (Exception e) { - throw new DBException(e); - } - } - } - - /** - * Cleanup any state for this DB. Called once per DB instance; there is one DB - * instance per client thread. - */ - public void cleanup() throws DBException { - tr.close(); - } - - /** - * Read a record from the database. Each field/value pair from the result will - * be stored in a HashMap. - * - * @param table - * The name of the table - * @param key - * The record key of the record to read. - * @param fields - * The list of fields to read, or null for all of them - * @param result - * A HashMap of field/value pairs for the result - * @return Zero on success, a non-zero error code on error - */ - public Status read(String table, String key, Set<String> fields, - HashMap<String, ByteIterator> result) { - if (!tableName.equals(table)) { - try { - client.set_keyspace(table); - tableName = table; - } catch (Exception e) { - e.printStackTrace(); - e.printStackTrace(System.out); - return Status.ERROR; - } - } - - for (int i = 0; i < operationRetries; i++) { - - try { - SlicePredicate predicate; - if (fields == null) { - predicate = new SlicePredicate().setSlice_range(new SliceRange( - EMPTY_BYTE_BUFFER, EMPTY_BYTE_BUFFER, false, 1000000)); - - } else { - ArrayList<ByteBuffer> fieldlist = - new ArrayList<ByteBuffer>(fields.size()); - for (String s : fields) { - fieldlist.add(ByteBuffer.wrap(s.getBytes("UTF-8"))); - } - - predicate = new SlicePredicate().setColumn_names(fieldlist); - } - - List<ColumnOrSuperColumn> results = - client.get_slice(ByteBuffer.wrap(key.getBytes("UTF-8")), parent, - predicate, ConsistencyLevel.ONE); - - if (debug) { - System.out.print("Reading key: " + key); - } - - Column column; - String name; - ByteIterator value; - for (ColumnOrSuperColumn oneresult : results) { - - column = oneresult.column; - name = new String(column.name.array(), - column.name.position() + column.name.arrayOffset(), - column.name.remaining()); - value = new ByteArrayByteIterator(column.value.array(), - column.value.position() + column.value.arrayOffset(), - column.value.remaining()); - - result.put(name, value); - - if (debug) { - System.out.print("(" + name + "=" + value + ")"); - } - } - - if (debug) { - System.out.println(); - } - - return Status.OK; - } catch (Exception e) { - errorexception = e; - } - - try { - Thread.sleep(500); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - errorexception.printStackTrace(); - errorexception.printStackTrace(System.out); - return Status.ERROR; - - } - - /** - * Perform a range scan for a set of records in the database. Each field/value - * pair from the result will be stored in a HashMap. - * - * @param table - * The name of the table - * @param startkey - * The record key of the first record to read. - * @param recordcount - * The number of records to read - * @param fields - * The list of fields to read, or null for all of them - * @param result - * A Vector of HashMaps, where each HashMap is a set field/value - * pairs for one record - * @return Zero on success, a non-zero error code on error - */ - public Status scan(String table, String startkey, int recordcount, - Set<String> fields, Vector<HashMap<String, ByteIterator>> result) { - if (!tableName.equals(table)) { - try { - client.set_keyspace(table); - tableName = table; - } catch (Exception e) { - e.printStackTrace(); - e.printStackTrace(System.out); - return Status.ERROR; - } - } - - for (int i = 0; i < operationRetries; i++) { - - try { - SlicePredicate predicate; - if (fields == null) { - predicate = new SlicePredicate().setSlice_range(new SliceRange( - EMPTY_BYTE_BUFFER, EMPTY_BYTE_BUFFER, false, 1000000)); - - } else { - ArrayList<ByteBuffer> fieldlist = - new ArrayList<ByteBuffer>(fields.size()); - for (String s : fields) { - fieldlist.add(ByteBuffer.wrap(s.getBytes("UTF-8"))); - } - - predicate = new SlicePredicate().setColumn_names(fieldlist); - } - - KeyRange kr = new KeyRange().setStart_key(startkey.getBytes("UTF-8")) - .setEnd_key(new byte[] {}).setCount(recordcount); - - List<KeySlice> results = client.get_range_slices(parent, predicate, kr, - ConsistencyLevel.ONE); - - if (debug) { - System.out.println("Scanning startkey: " + startkey); - } - - HashMap<String, ByteIterator> tuple; - for (KeySlice oneresult : results) { - tuple = new HashMap<String, ByteIterator>(); - - Column column; - String name; - ByteIterator value; - for (ColumnOrSuperColumn onecol : oneresult.columns) { - column = onecol.column; - name = new String(column.name.array(), - column.name.position() + column.name.arrayOffset(), - column.name.remaining()); - value = new ByteArrayByteIterator(column.value.array(), - column.value.position() + column.value.arrayOffset(), - column.value.remaining()); - - tuple.put(name, value); - - if (debug) { - System.out.print("(" + name + "=" + value + ")"); - } - } - - result.add(tuple); - if (debug) { - System.out.println(); - } - } - - return Status.OK; - } catch (Exception e) { - errorexception = e; - } - try { - Thread.sleep(500); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - errorexception.printStackTrace(); - errorexception.printStackTrace(System.out); - return Status.ERROR; - } - - /** - * Update a record in the database. Any field/value pairs in the specified - * values HashMap will be written into the record with the specified record - * key, overwriting any existing values with the same field name. - * - * @param table - * The name of the table - * @param key - * The record key of the record to write. - * @param values - * A HashMap of field/value pairs to update in the record - * @return Zero on success, a non-zero error code on error - */ - public Status update(String table, String key, - HashMap<String, ByteIterator> values) { - return insert(table, key, values); - } - - /** - * Insert a record in the database. Any field/value pairs in the specified - * values HashMap will be written into the record with the specified record - * key. - * - * @param table - * The name of the table - * @param key - * The record key of the record to insert. - * @param values - * A HashMap of field/value pairs to insert in the record - * @return Zero on success, a non-zero error code on error - */ - public Status insert(String table, String key, - HashMap<String, ByteIterator> values) { - if (!tableName.equals(table)) { - try { - client.set_keyspace(table); - tableName = table; - } catch (Exception e) { - e.printStackTrace(); - e.printStackTrace(System.out); - return Status.ERROR; - } - } - - for (int i = 0; i < operationRetries; i++) { - mutations.clear(); - mutationMap.clear(); - record.clear(); - - if (debug) { - System.out.println("Inserting key: " + key); - } - - try { - ByteBuffer wrappedKey = ByteBuffer.wrap(key.getBytes("UTF-8")); - - Column col; - ColumnOrSuperColumn column; - for (Map.Entry<String, ByteIterator> entry : values.entrySet()) { - col = new Column(); - col.setName(ByteBuffer.wrap(entry.getKey().getBytes("UTF-8"))); - col.setValue(ByteBuffer.wrap(entry.getValue().toArray())); - col.setTimestamp(System.currentTimeMillis()); - - column = new ColumnOrSuperColumn(); - column.setColumn(col); - - mutations.add(new Mutation().setColumn_or_supercolumn(column)); - } - - mutationMap.put(columnFamily, mutations); - record.put(wrappedKey, mutationMap); - - client.batch_mutate(record, ConsistencyLevel.ONE); - - return Status.OK; - } catch (Exception e) { - errorexception = e; - } - try { - Thread.sleep(500); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - - errorexception.printStackTrace(); - errorexception.printStackTrace(System.out); - return Status.ERROR; - } - - /** - * Delete a record from the database. - * - * @param table - * The name of the table - * @param key - * The record key of the record to delete. - * @return Zero on success, a non-zero error code on error - */ - public Status delete(String table, String key) { - if (!tableName.equals(table)) { - try { - client.set_keyspace(table); - tableName = table; - } catch (Exception e) { - e.printStackTrace(); - e.printStackTrace(System.out); - return Status.ERROR; - } - } - - for (int i = 0; i < operationRetries; i++) { - try { - client.remove(ByteBuffer.wrap(key.getBytes("UTF-8")), - new ColumnPath(columnFamily), System.currentTimeMillis(), - ConsistencyLevel.ONE); - - if (debug) { - System.out.println("Delete key: " + key); - } - - return Status.OK; - } catch (Exception e) { - errorexception = e; - } - try { - Thread.sleep(500); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - errorexception.printStackTrace(); - errorexception.printStackTrace(System.out); - return Status.ERROR; - } - - public static void main(String[] args) { - CassandraClient8 cli = new CassandraClient8(); - - Properties props = new Properties(); - - props.setProperty("hosts", args[0]); - cli.setProperties(props); - - try { - cli.init(); - } catch (Exception e) { - e.printStackTrace(); - System.exit(0); - } - - HashMap<String, ByteIterator> vals = new HashMap<String, ByteIterator>(); - vals.put("age", new StringByteIterator("57")); - vals.put("middlename", new StringByteIterator("bradley")); - vals.put("favoritecolor", new StringByteIterator("blue")); - Status res = cli.insert("usertable", "BrianFrankCooper", vals); - System.out.println("Result of insert: " + res); - - HashMap<String, ByteIterator> result = new HashMap<String, ByteIterator>(); - HashSet<String> fields = new HashSet<String>(); - fields.add("middlename"); - fields.add("age"); - fields.add("favoritecolor"); - res = cli.read("usertable", "BrianFrankCooper", null, result); - System.out.println("Result of read: " + res); - for (Map.Entry<String, ByteIterator> entry : result.entrySet()) { - System.out.println("[" + entry.getKey() + "]=[" + entry.getValue() + "]"); - } - - res = cli.delete("usertable", "BrianFrankCooper"); - System.out.println("Result of delete: " + res); - } -} diff --git a/cassandra/src/main/java/com/yahoo/ycsb/db/package-info.java b/cassandra/src/main/java/com/yahoo/ycsb/db/package-info.java index 88ce1f0111a374f41bf98efe0f5350beb6b9b140..007f01dc54dd9d543b5380dff68880f23b1c009b 100644 --- a/cassandra/src/main/java/com/yahoo/ycsb/db/package-info.java +++ b/cassandra/src/main/java/com/yahoo/ycsb/db/package-info.java @@ -17,7 +17,7 @@ /** * The YCSB binding for <a href="http://cassandra.apache.org/">Cassandra</a> - * versions 0.7, 0.8, and 1.0.X. + * 2.1+ via CQL. */ package com.yahoo.ycsb.db; diff --git a/cassandra2/src/test/java/com/yahoo/ycsb/db/CassandraCQLClientTest.java b/cassandra/src/test/java/com/yahoo/ycsb/db/CassandraCQLClientTest.java similarity index 100% rename from cassandra2/src/test/java/com/yahoo/ycsb/db/CassandraCQLClientTest.java rename to cassandra/src/test/java/com/yahoo/ycsb/db/CassandraCQLClientTest.java diff --git a/cassandra2/src/test/resources/ycsb.cql b/cassandra/src/test/resources/ycsb.cql similarity index 100% rename from cassandra2/src/test/resources/ycsb.cql rename to cassandra/src/test/resources/ycsb.cql diff --git a/cassandra2/README.md b/cassandra2/README.md deleted file mode 100644 index bee44c8d8c41a36f459d0b38b54d35422dddac1a..0000000000000000000000000000000000000000 --- a/cassandra2/README.md +++ /dev/null @@ -1,80 +0,0 @@ -<!-- -Copyright (c) 2015 YCSB contributors. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); you -may not use this file except in compliance with the License. You -may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -implied. See the License for the specific language governing -permissions and limitations under the License. See accompanying -LICENSE file. ---> - -# Apache Cassandra 2.x CQL binding - -Binding for [Apache Cassandra](http://cassandra.apache.org), using the CQL API -via the [DataStax -driver](http://docs.datastax.com/en/developer/java-driver/2.1/java-driver/whatsNew2.html). - -To run against the (deprecated) Cassandra Thrift API, use the `cassandra-10` binding. - -## Creating a table for use with YCSB - -For keyspace `ycsb`, table `usertable`: - - cqlsh> create keyspace ycsb - WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor': 3 }; - cqlsh> USE ycsb; - cqlsh> create table usertable ( - y_id varchar primary key, - field0 varchar, - field1 varchar, - field2 varchar, - field3 varchar, - field4 varchar, - field5 varchar, - field6 varchar, - field7 varchar, - field8 varchar, - field9 varchar); - -**Note that `replication_factor` and consistency levels (below) will affect performance.** - -## Cassandra Configuration Parameters - -- `hosts` (**required**) - - Cassandra nodes to connect to. - - No default. - -* `port` - * CQL port for communicating with Cassandra cluster. - * Default is `9042`. - -- `cassandra.keyspace` - Keyspace name - must match the keyspace for the table created (see above). - See http://docs.datastax.com/en/cql/3.1/cql/cql_reference/create_keyspace_r.html for details. - - - Default value is `ycsb` - -- `cassandra.username` -- `cassandra.password` - - Optional user name and password for authentication. See http://docs.datastax.com/en/cassandra/2.0/cassandra/security/security_config_native_authenticate_t.html for details. - -* `cassandra.readconsistencylevel` -* `cassandra.writeconsistencylevel` - - * Default value is `ONE` - - Consistency level for reads and writes, respectively. See the [DataStax documentation](http://docs.datastax.com/en/cassandra/2.0/cassandra/dml/dml_config_consistency_c.html) for details. - * *Note that the default setting does not provide durability in the face of node failure. Changing this setting will affect observed performance.* See also `replication_factor`, above. - -* `cassandra.maxconnections` -* `cassandra.coreconnections` - * Defaults for max and core connections can be found here: https://datastax.github.io/java-driver/2.1.8/features/pooling/#pool-size. Cassandra 2.0.X falls under protocol V2, Cassandra 2.1+ falls under protocol V3. -* `cassandra.connecttimeoutmillis` -* `cassandra.readtimeoutmillis` - * Defaults for connect and read timeouts can be found here: https://docs.datastax.com/en/drivers/java/2.0/com/datastax/driver/core/SocketOptions.html. \ No newline at end of file diff --git a/cassandra2/pom.xml b/cassandra2/pom.xml deleted file mode 100644 index 28a843bc79fbde28efb990cd4f0192f82f088163..0000000000000000000000000000000000000000 --- a/cassandra2/pom.xml +++ /dev/null @@ -1,81 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> - -<!-- -Copyright (c) 2012-2016 YCSB contributors. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); you -may not use this file except in compliance with the License. You -may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -implied. See the License for the specific language governing -permissions and limitations under the License. See accompanying -LICENSE file. ---> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>com.yahoo.ycsb</groupId> - <artifactId>binding-parent</artifactId> - <version>0.11.0-SNAPSHOT</version> - <relativePath>../binding-parent</relativePath> - </parent> - - <artifactId>cassandra2-binding</artifactId> - <name>Cassandra 2.1+ DB Binding</name> - <packaging>jar</packaging> - - <properties> - <!-- Skip tests by default. will be activated by jdk8 profile --> - <skipTests>true</skipTests> - </properties> - - <dependencies> - <!-- CQL driver --> - <dependency> - <groupId>com.datastax.cassandra</groupId> - <artifactId>cassandra-driver-core</artifactId> - <version>${cassandra2.cql.version}</version> - </dependency> - <dependency> - <groupId>com.yahoo.ycsb</groupId> - <artifactId>core</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.cassandraunit</groupId> - <artifactId>cassandra-unit</artifactId> - <version>3.0.0.1</version> - <classifier>shaded</classifier> - <scope>test</scope> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <version>4.12</version> - <scope>test</scope> - </dependency> - </dependencies> - - <profiles> - <!-- Cassandra 2.2+ requires JDK8 to run, so none of our tests - will work unless we're using jdk8. - --> - <profile> - <id>jdk8-tests</id> - <activation> - <jdk>1.8</jdk> - </activation> - <properties> - <skipTests>false</skipTests> - </properties> - </profile> - </profiles> -</project> diff --git a/cassandra2/src/main/java/com/yahoo/ycsb/db/CassandraCQLClient.java b/cassandra2/src/main/java/com/yahoo/ycsb/db/CassandraCQLClient.java deleted file mode 100644 index d4dc8c7707b429f090561967d35acaa5052c4cac..0000000000000000000000000000000000000000 --- a/cassandra2/src/main/java/com/yahoo/ycsb/db/CassandraCQLClient.java +++ /dev/null @@ -1,480 +0,0 @@ -/** - * Copyright (c) 2013-2015 YCSB contributors. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. See accompanying LICENSE file. - * - * Submitted by Chrisjan Matser on 10/11/2010. - */ -package com.yahoo.ycsb.db; - -import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.ColumnDefinitions; -import com.datastax.driver.core.ConsistencyLevel; -import com.datastax.driver.core.Host; -import com.datastax.driver.core.HostDistance; -import com.datastax.driver.core.Metadata; -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.Row; -import com.datastax.driver.core.Session; -import com.datastax.driver.core.SimpleStatement; -import com.datastax.driver.core.Statement; -import com.datastax.driver.core.querybuilder.Insert; -import com.datastax.driver.core.querybuilder.QueryBuilder; -import com.datastax.driver.core.querybuilder.Select; -import com.yahoo.ycsb.ByteArrayByteIterator; -import com.yahoo.ycsb.ByteIterator; -import com.yahoo.ycsb.DB; -import com.yahoo.ycsb.DBException; -import com.yahoo.ycsb.Status; - -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.Vector; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Cassandra 2.x CQL client. - * - * See {@code cassandra2/README.md} for details. - * - * @author cmatser - */ -public class CassandraCQLClient extends DB { - - private static Cluster cluster = null; - private static Session session = null; - - private static ConsistencyLevel readConsistencyLevel = ConsistencyLevel.ONE; - private static ConsistencyLevel writeConsistencyLevel = ConsistencyLevel.ONE; - - public static final String YCSB_KEY = "y_id"; - public static final String KEYSPACE_PROPERTY = "cassandra.keyspace"; - public static final String KEYSPACE_PROPERTY_DEFAULT = "ycsb"; - public static final String USERNAME_PROPERTY = "cassandra.username"; - public static final String PASSWORD_PROPERTY = "cassandra.password"; - - public static final String HOSTS_PROPERTY = "hosts"; - public static final String PORT_PROPERTY = "port"; - public static final String PORT_PROPERTY_DEFAULT = "9042"; - - public static final String READ_CONSISTENCY_LEVEL_PROPERTY = - "cassandra.readconsistencylevel"; - public static final String READ_CONSISTENCY_LEVEL_PROPERTY_DEFAULT = "ONE"; - public static final String WRITE_CONSISTENCY_LEVEL_PROPERTY = - "cassandra.writeconsistencylevel"; - public static final String WRITE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT = "ONE"; - - public static final String MAX_CONNECTIONS_PROPERTY = - "cassandra.maxconnections"; - public static final String CORE_CONNECTIONS_PROPERTY = - "cassandra.coreconnections"; - public static final String CONNECT_TIMEOUT_MILLIS_PROPERTY = - "cassandra.connecttimeoutmillis"; - public static final String READ_TIMEOUT_MILLIS_PROPERTY = - "cassandra.readtimeoutmillis"; - - /** - * Count the number of times initialized to teardown on the last - * {@link #cleanup()}. - */ - private static final AtomicInteger INIT_COUNT = new AtomicInteger(0); - - private static boolean debug = false; - - /** - * Initialize any state for this DB. Called once per DB instance; there is one - * DB instance per client thread. - */ - @Override - public void init() throws DBException { - - // Keep track of number of calls to init (for later cleanup) - INIT_COUNT.incrementAndGet(); - - // Synchronized so that we only have a single - // cluster/session instance for all the threads. - synchronized (INIT_COUNT) { - - // Check if the cluster has already been initialized - if (cluster != null) { - return; - } - - try { - - debug = - Boolean.parseBoolean(getProperties().getProperty("debug", "false")); - - String host = getProperties().getProperty(HOSTS_PROPERTY); - if (host == null) { - throw new DBException(String.format( - "Required property \"%s\" missing for CassandraCQLClient", - HOSTS_PROPERTY)); - } - String[] hosts = host.split(","); - String port = getProperties().getProperty(PORT_PROPERTY, PORT_PROPERTY_DEFAULT); - - String username = getProperties().getProperty(USERNAME_PROPERTY); - String password = getProperties().getProperty(PASSWORD_PROPERTY); - - String keyspace = getProperties().getProperty(KEYSPACE_PROPERTY, - KEYSPACE_PROPERTY_DEFAULT); - - readConsistencyLevel = ConsistencyLevel.valueOf( - getProperties().getProperty(READ_CONSISTENCY_LEVEL_PROPERTY, - READ_CONSISTENCY_LEVEL_PROPERTY_DEFAULT)); - writeConsistencyLevel = ConsistencyLevel.valueOf( - getProperties().getProperty(WRITE_CONSISTENCY_LEVEL_PROPERTY, - WRITE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT)); - - if ((username != null) && !username.isEmpty()) { - cluster = Cluster.builder().withCredentials(username, password) - .withPort(Integer.valueOf(port)).addContactPoints(hosts).build(); - } else { - cluster = Cluster.builder().withPort(Integer.valueOf(port)) - .addContactPoints(hosts).build(); - } - - String maxConnections = getProperties().getProperty( - MAX_CONNECTIONS_PROPERTY); - if (maxConnections != null) { - cluster.getConfiguration().getPoolingOptions() - .setMaxConnectionsPerHost(HostDistance.LOCAL, - Integer.valueOf(maxConnections)); - } - - String coreConnections = getProperties().getProperty( - CORE_CONNECTIONS_PROPERTY); - if (coreConnections != null) { - cluster.getConfiguration().getPoolingOptions() - .setCoreConnectionsPerHost(HostDistance.LOCAL, - Integer.valueOf(coreConnections)); - } - - String connectTimoutMillis = getProperties().getProperty( - CONNECT_TIMEOUT_MILLIS_PROPERTY); - if (connectTimoutMillis != null) { - cluster.getConfiguration().getSocketOptions() - .setConnectTimeoutMillis(Integer.valueOf(connectTimoutMillis)); - } - - String readTimoutMillis = getProperties().getProperty( - READ_TIMEOUT_MILLIS_PROPERTY); - if (readTimoutMillis != null) { - cluster.getConfiguration().getSocketOptions() - .setReadTimeoutMillis(Integer.valueOf(readTimoutMillis)); - } - - Metadata metadata = cluster.getMetadata(); - System.err.printf("Connected to cluster: %s\n", - metadata.getClusterName()); - - for (Host discoveredHost : metadata.getAllHosts()) { - System.out.printf("Datacenter: %s; Host: %s; Rack: %s\n", - discoveredHost.getDatacenter(), discoveredHost.getAddress(), - discoveredHost.getRack()); - } - - session = cluster.connect(keyspace); - - } catch (Exception e) { - throw new DBException(e); - } - } // synchronized - } - - /** - * Cleanup any state for this DB. Called once per DB instance; there is one DB - * instance per client thread. - */ - @Override - public void cleanup() throws DBException { - synchronized (INIT_COUNT) { - final int curInitCount = INIT_COUNT.decrementAndGet(); - if (curInitCount <= 0) { - session.close(); - cluster.close(); - cluster = null; - session = null; - } - if (curInitCount < 0) { - // This should never happen. - throw new DBException( - String.format("initCount is negative: %d", curInitCount)); - } - } - } - - /** - * Read a record from the database. Each field/value pair from the result will - * be stored in a HashMap. - * - * @param table - * The name of the table - * @param key - * The record key of the record to read. - * @param fields - * The list of fields to read, or null for all of them - * @param result - * A HashMap of field/value pairs for the result - * @return Zero on success, a non-zero error code on error - */ - @Override - public Status read(String table, String key, Set<String> fields, - HashMap<String, ByteIterator> result) { - try { - Statement stmt; - Select.Builder selectBuilder; - - if (fields == null) { - selectBuilder = QueryBuilder.select().all(); - } else { - selectBuilder = QueryBuilder.select(); - for (String col : fields) { - ((Select.Selection) selectBuilder).column(col); - } - } - - stmt = selectBuilder.from(table).where(QueryBuilder.eq(YCSB_KEY, key)) - .limit(1); - stmt.setConsistencyLevel(readConsistencyLevel); - - if (debug) { - System.out.println(stmt.toString()); - } - - ResultSet rs = session.execute(stmt); - - if (rs.isExhausted()) { - return Status.NOT_FOUND; - } - - // Should be only 1 row - Row row = rs.one(); - ColumnDefinitions cd = row.getColumnDefinitions(); - - for (ColumnDefinitions.Definition def : cd) { - ByteBuffer val = row.getBytesUnsafe(def.getName()); - if (val != null) { - result.put(def.getName(), new ByteArrayByteIterator(val.array())); - } else { - result.put(def.getName(), null); - } - } - - return Status.OK; - - } catch (Exception e) { - e.printStackTrace(); - System.out.println("Error reading key: " + key); - return Status.ERROR; - } - - } - - /** - * Perform a range scan for a set of records in the database. Each field/value - * pair from the result will be stored in a HashMap. - * - * Cassandra CQL uses "token" method for range scan which doesn't always yield - * intuitive results. - * - * @param table - * The name of the table - * @param startkey - * The record key of the first record to read. - * @param recordcount - * The number of records to read - * @param fields - * The list of fields to read, or null for all of them - * @param result - * A Vector of HashMaps, where each HashMap is a set field/value - * pairs for one record - * @return Zero on success, a non-zero error code on error - */ - @Override - public Status scan(String table, String startkey, int recordcount, - Set<String> fields, Vector<HashMap<String, ByteIterator>> result) { - - try { - Statement stmt; - Select.Builder selectBuilder; - - if (fields == null) { - selectBuilder = QueryBuilder.select().all(); - } else { - selectBuilder = QueryBuilder.select(); - for (String col : fields) { - ((Select.Selection) selectBuilder).column(col); - } - } - - stmt = selectBuilder.from(table); - - // The statement builder is not setup right for tokens. - // So, we need to build it manually. - String initialStmt = stmt.toString(); - StringBuilder scanStmt = new StringBuilder(); - scanStmt.append(initialStmt.substring(0, initialStmt.length() - 1)); - scanStmt.append(" WHERE "); - scanStmt.append(QueryBuilder.token(YCSB_KEY)); - scanStmt.append(" >= "); - scanStmt.append("token('"); - scanStmt.append(startkey); - scanStmt.append("')"); - scanStmt.append(" LIMIT "); - scanStmt.append(recordcount); - - stmt = new SimpleStatement(scanStmt.toString()); - stmt.setConsistencyLevel(readConsistencyLevel); - - if (debug) { - System.out.println(stmt.toString()); - } - - ResultSet rs = session.execute(stmt); - - HashMap<String, ByteIterator> tuple; - while (!rs.isExhausted()) { - Row row = rs.one(); - tuple = new HashMap<String, ByteIterator>(); - - ColumnDefinitions cd = row.getColumnDefinitions(); - - for (ColumnDefinitions.Definition def : cd) { - ByteBuffer val = row.getBytesUnsafe(def.getName()); - if (val != null) { - tuple.put(def.getName(), new ByteArrayByteIterator(val.array())); - } else { - tuple.put(def.getName(), null); - } - } - - result.add(tuple); - } - - return Status.OK; - - } catch (Exception e) { - e.printStackTrace(); - System.out.println("Error scanning with startkey: " + startkey); - return Status.ERROR; - } - - } - - /** - * Update a record in the database. Any field/value pairs in the specified - * values HashMap will be written into the record with the specified record - * key, overwriting any existing values with the same field name. - * - * @param table - * The name of the table - * @param key - * The record key of the record to write. - * @param values - * A HashMap of field/value pairs to update in the record - * @return Zero on success, a non-zero error code on error - */ - @Override - public Status update(String table, String key, - HashMap<String, ByteIterator> values) { - // Insert and updates provide the same functionality - return insert(table, key, values); - } - - /** - * Insert a record in the database. Any field/value pairs in the specified - * values HashMap will be written into the record with the specified record - * key. - * - * @param table - * The name of the table - * @param key - * The record key of the record to insert. - * @param values - * A HashMap of field/value pairs to insert in the record - * @return Zero on success, a non-zero error code on error - */ - @Override - public Status insert(String table, String key, - HashMap<String, ByteIterator> values) { - - try { - Insert insertStmt = QueryBuilder.insertInto(table); - - // Add key - insertStmt.value(YCSB_KEY, key); - - // Add fields - for (Map.Entry<String, ByteIterator> entry : values.entrySet()) { - Object value; - ByteIterator byteIterator = entry.getValue(); - value = byteIterator.toString(); - - insertStmt.value(entry.getKey(), value); - } - - insertStmt.setConsistencyLevel(writeConsistencyLevel); - - if (debug) { - System.out.println(insertStmt.toString()); - } - - session.execute(insertStmt); - - return Status.OK; - } catch (Exception e) { - e.printStackTrace(); - } - - return Status.ERROR; - } - - /** - * Delete a record from the database. - * - * @param table - * The name of the table - * @param key - * The record key of the record to delete. - * @return Zero on success, a non-zero error code on error - */ - @Override - public Status delete(String table, String key) { - - try { - Statement stmt; - - stmt = QueryBuilder.delete().from(table) - .where(QueryBuilder.eq(YCSB_KEY, key)); - stmt.setConsistencyLevel(writeConsistencyLevel); - - if (debug) { - System.out.println(stmt.toString()); - } - - session.execute(stmt); - - return Status.OK; - } catch (Exception e) { - e.printStackTrace(); - System.out.println("Error deleting key: " + key); - } - - return Status.ERROR; - } - -} diff --git a/cassandra2/src/main/java/com/yahoo/ycsb/db/package-info.java b/cassandra2/src/main/java/com/yahoo/ycsb/db/package-info.java deleted file mode 100644 index 007f01dc54dd9d543b5380dff68880f23b1c009b..0000000000000000000000000000000000000000 --- a/cassandra2/src/main/java/com/yahoo/ycsb/db/package-info.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Copyright (c) 2014, Yahoo!, Inc. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. You - * may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. See accompanying - * LICENSE file. - */ - -/** - * The YCSB binding for <a href="http://cassandra.apache.org/">Cassandra</a> - * 2.1+ via CQL. - */ -package com.yahoo.ycsb.db; - diff --git a/pom.xml b/pom.xml index 2966efdcce401ae043f12018ec98844ded084d14..b3028f591c10825e5270f114d818749f6b790bd8 100644 --- a/pom.xml +++ b/pom.xml @@ -73,9 +73,7 @@ LICENSE file. <hbase098.version>0.98.14-hadoop2</hbase098.version> <hbase10.version>1.0.2</hbase10.version> <accumulo.version>1.6.0</accumulo.version> - <cassandra.version>1.2.9</cassandra.version> - <cassandra.cql.version>1.0.3</cassandra.cql.version> - <cassandra2.cql.version>3.0.0</cassandra2.cql.version> + <cassandra.cql.version>3.0.0</cassandra.cql.version> <geode.version>1.0.0-incubating.M2</geode.version> <googlebigtable.version>0.2.3</googlebigtable.version> <infinispan.version>7.2.2.Final</infinispan.version> @@ -108,7 +106,6 @@ LICENSE file. <module>aerospike</module> <module>asynchbase</module> <module>cassandra</module> - <module>cassandra2</module> <module>couchbase</module> <module>couchbase2</module> <module>distribution</module>