diff --git a/bin/ycsb b/bin/ycsb index a3769f18bd030338cf96285041aa607b93870590..8036dc71bbb7f8116f1cf6cd273e8ff953615234 100755 --- a/bin/ycsb +++ b/bin/ycsb @@ -51,6 +51,7 @@ DATABASES = { "cassandra-8" : "com.yahoo.ycsb.db.CassandraClient8", "cassandra-10" : "com.yahoo.ycsb.db.CassandraClient10", "cassandra-cql": "com.yahoo.ycsb.db.CassandraCQLClient", + "cassandra2-cql": "com.yahoo.ycsb.db.CassandraCQLClient", "couchbase" : "com.yahoo.ycsb.db.CouchbaseClient", "dynamodb" : "com.yahoo.ycsb.db.DynamoDBClient", "elasticsearch": "com.yahoo.ycsb.db.ElasticSearchClient", diff --git a/cassandra/README.md b/cassandra/README.md new file mode 100644 index 0000000000000000000000000000000000000000..11853f97d4031187931bd77c6669cf54cf1f6be3 --- /dev/null +++ b/cassandra/README.md @@ -0,0 +1,65 @@ +<!-- +Copyright (c) 2015 YCSB contributors. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); you +may not use this file except in compliance with the License. You +may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +implied. See the License for the specific language governing +permissions and limitations under the License. See accompanying +LICENSE file. +--> + +# Cassandra (0.7, 0.8, 1.x) drivers for YCSB + +**For Cassandra 2 CQL support, use the `cassandra2-cql` binding. The Thrift drivers below are deprecated, and the CQL driver here does not support Cassandra 2.1+.** + +There are three drivers in the Cassandra binding: + +* `cassandra-7`: Cassandra 0.7 Thrift binding. +* `cassandra-8`: Cassandra 0.8 Thrift binding. +* `cassandra-10`: Cassandra 1.0+ Thrift binding. +* `cassandra-cql`: Cassandra CQL binding, for Cassandra 1.x to 2.0. See `cassandra2/README.md` for details on parameters. + +# `cassandra-10` + +## Creating a table + +Using `cassandra-cli`: + + create keyspace usertable with placement_strategy = 'org.apache.cassandra.locator.SimpleStrategy' and strategy_options = {replication_factor:1}; + + create column family data with column_type = 'Standard' and comparator = 'UTF8Type'; + +**Note that `replication_factor` and consistency levels (below) will affect performance.** + +## Configuration Parameters + +- `hosts` (**required**) + - Cassandra nodes to connect to. + - No default. + +* `port` + - Thrift port for communicating with Cassandra cluster. + * Default is `9160`. + +- `cassandra.columnfamily` + - Column family name - must match the column family for the table created (see above). + - Default value is `data` + +- `cassandra.username` +- `cassandra.password` + - Optional user name and password for authentication. See http://docs.datastax.com/en/cassandra/2.0/cassandra/security/security_config_native_authenticate_t.html for details. + +* `cassandra.readconsistencylevel` +* `cassandra.scanconsistencylevel` +* `cassandra.writeconsistencylevel` + + - Default value is `ONE` + - Consistency level for reads and writes, respectively. See the [DataStax documentation](http://docs.datastax.com/en/cassandra/2.0/cassandra/dml/dml_config_consistency_c.html) for details. + - *Note that the default setting does not provide durability in the face of node failure. Changing this setting will affect observed performance.* See also `replication_factor`, above. diff --git a/cassandra/src/main/java/com/yahoo/ycsb/db/CassandraCQLClient.java b/cassandra/src/main/java/com/yahoo/ycsb/db/CassandraCQLClient.java index 90e27d768591f1099fad6e76807d64889f2c03c7..578a5a8f50ef28778d996e5181c1962d1d42dac2 100755 --- a/cassandra/src/main/java/com/yahoo/ycsb/db/CassandraCQLClient.java +++ b/cassandra/src/main/java/com/yahoo/ycsb/db/CassandraCQLClient.java @@ -33,22 +33,9 @@ import java.util.concurrent.atomic.AtomicInteger; /** * Tested with Cassandra 2.0, CQL client for YCSB framework - * - * In CQLSH, create keyspace and table. Something like: - * cqlsh> create keyspace ycsb - * WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor': 1 }; - * cqlsh> create table usertable ( - * y_id varchar primary key, - * field0 varchar, - * field1 varchar, - * field2 varchar, - * field3 varchar, - * field4 varchar, - * field5 varchar, - * field6 varchar, - * field7 varchar, - * field8 varchar, - * field9 varchar); + * + * See {@code cassandra2} for a version compatible with Cassandra 2.1+. + * See {@code cassandra2/README.md} for details. * * @author cmatser */ diff --git a/cassandra2/README.md b/cassandra2/README.md new file mode 100644 index 0000000000000000000000000000000000000000..e3e56b90c7cfbbe2d11c2efcaf5cb277af00bdb8 --- /dev/null +++ b/cassandra2/README.md @@ -0,0 +1,73 @@ +<!-- +Copyright (c) 2015 YCSB contributors. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); you +may not use this file except in compliance with the License. You +may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +implied. See the License for the specific language governing +permissions and limitations under the License. See accompanying +LICENSE file. +--> + +# Apache Cassandra 2.x CQL binding + +Binding for [Apache Cassandra](http://cassandra.apache.org), using the CQL API +via the [DataStax +driver](http://docs.datastax.com/en/developer/java-driver/2.1/java-driver/whatsNew2.html). + +To run against the (deprecated) Cassandra Thrift API, use the `cassandra-10` binding. + +## Creating a table for use with YCSB + +For keyspace `ycsb`, table `usertable`: + + cqlsh> create keyspace ycsb + WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor': 3 }; + cqlsh> USE ycsb; + cqlsh> create table usertable ( + y_id varchar primary key, + field0 varchar, + field1 varchar, + field2 varchar, + field3 varchar, + field4 varchar, + field5 varchar, + field6 varchar, + field7 varchar, + field8 varchar, + field9 varchar); + +**Note that `replication_factor` and consistency levels (below) will affect performance.** + +## Cassandra Configuration Parameters + +- `hosts` (**required**) + - Cassandra nodes to connect to. + - No default. + +* `port` + * CQL port for communicating with Cassandra cluster. + * Default is `9042`. + +- `cassandra.keyspace` + Keyspace name - must match the keyspace for the table created (see above). + See http://docs.datastax.com/en/cql/3.1/cql/cql_reference/create_keyspace_r.html for details. + + - Default value is `ycsb` + +- `cassandra.username` +- `cassandra.password` + - Optional user name and password for authentication. See http://docs.datastax.com/en/cassandra/2.0/cassandra/security/security_config_native_authenticate_t.html for details. + +* `cassandra.readconsistencylevel` +* `cassandra.writeconsistencylevel` + + * Default value is `ONE` + - Consistency level for reads and writes, respectively. See the [DataStax documentation](http://docs.datastax.com/en/cassandra/2.0/cassandra/dml/dml_config_consistency_c.html) for details. + * *Note that the default setting does not provide durability in the face of node failure. Changing this setting will affect observed performance.* See also `replication_factor`, above. diff --git a/cassandra2/pom.xml b/cassandra2/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..7ad8132d61d5f83ab5939efa9eb521dbc889325a --- /dev/null +++ b/cassandra2/pom.xml @@ -0,0 +1,61 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- +Copyright (c) 2012-2015 YCSB contributors. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); you +may not use this file except in compliance with the License. You +may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +implied. See the License for the specific language governing +permissions and limitations under the License. See accompanying +LICENSE file. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>com.yahoo.ycsb</groupId> + <artifactId>binding-parent</artifactId> + <version>0.5.0-SNAPSHOT</version> + <relativePath>../binding-parent</relativePath> + </parent> + + <artifactId>cassandra2-binding</artifactId> + <name>Cassandra 2.1+ DB Binding</name> + <packaging>jar</packaging> + + <dependencies> + <!-- CQL driver --> + <dependency> + <groupId>com.datastax.cassandra</groupId> + <artifactId>cassandra-driver-core</artifactId> + <version>${cassandra2.cql.version}</version> + </dependency> + <dependency> + <groupId>com.yahoo.ycsb</groupId> + <artifactId>core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.cassandraunit</groupId> + <artifactId>cassandra-unit-shaded</artifactId> + <version>2.1.9.2</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.12</version> + <scope>test</scope> + </dependency> + + </dependencies> +</project> diff --git a/cassandra2/src/main/java/com/yahoo/ycsb/db/CassandraCQLClient.java b/cassandra2/src/main/java/com/yahoo/ycsb/db/CassandraCQLClient.java new file mode 100644 index 0000000000000000000000000000000000000000..0fa2b3014fb9e34e8aa0655051a779b0daa02408 --- /dev/null +++ b/cassandra2/src/main/java/com/yahoo/ycsb/db/CassandraCQLClient.java @@ -0,0 +1,433 @@ +/** + * Copyright (c) 2013-2015 YCSB contributors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. See accompanying LICENSE file. + * + * Submitted by Chrisjan Matser on 10/11/2010. + */ +package com.yahoo.ycsb.db; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ColumnDefinitions; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.Host; +import com.datastax.driver.core.HostDistance; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.SimpleStatement; +import com.datastax.driver.core.Statement; +import com.datastax.driver.core.querybuilder.Insert; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.datastax.driver.core.querybuilder.Select; +import com.yahoo.ycsb.ByteArrayByteIterator; +import com.yahoo.ycsb.ByteIterator; +import com.yahoo.ycsb.DB; +import com.yahoo.ycsb.DBException; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.Vector; +import java.util.concurrent.atomic.AtomicInteger; + + +/** + * Cassandra 2.x CQL client. + * + * See {@code cassandra2/README.md} for details. + * + * @author cmatser + */ +public class CassandraCQLClient extends DB { + + protected static Cluster cluster = null; + protected static Session session = null; + + private static ConsistencyLevel readConsistencyLevel = ConsistencyLevel.ONE; + private static ConsistencyLevel writeConsistencyLevel = ConsistencyLevel.ONE; + + public static final int OK = 0; + public static final int ERR = -1; + public static final int NOT_FOUND = -3; + + public static final String YCSB_KEY = "y_id"; + public static final String KEYSPACE_PROPERTY = "cassandra.keyspace"; + public static final String KEYSPACE_PROPERTY_DEFAULT = "ycsb"; + public static final String USERNAME_PROPERTY = "cassandra.username"; + public static final String PASSWORD_PROPERTY = "cassandra.password"; + + public static final String HOSTS_PROPERTY = "hosts"; + public static final String PORT_PROPERTY = "port"; + + + public static final String READ_CONSISTENCY_LEVEL_PROPERTY = "cassandra.readconsistencylevel"; + public static final String READ_CONSISTENCY_LEVEL_PROPERTY_DEFAULT = "ONE"; + public static final String WRITE_CONSISTENCY_LEVEL_PROPERTY = "cassandra.writeconsistencylevel"; + public static final String WRITE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT = "ONE"; + + /** Count the number of times initialized to teardown on the last {@link #cleanup()}. */ + private static final AtomicInteger initCount = new AtomicInteger(0); + + private static boolean _debug = false; + + /** + * Initialize any state for this DB. Called once per DB instance; there is + * one DB instance per client thread. + */ + @Override + public void init() throws DBException { + + //Keep track of number of calls to init (for later cleanup) + initCount.incrementAndGet(); + + //Synchronized so that we only have a single + // cluster/session instance for all the threads. + synchronized (initCount) { + + //Check if the cluster has already been initialized + if (cluster != null) { + return; + } + + try { + + _debug = Boolean.parseBoolean(getProperties().getProperty("debug", "false")); + + String host = getProperties().getProperty(HOSTS_PROPERTY); + if (host == null) { + throw new DBException(String.format("Required property \"%s\" missing for CassandraCQLClient", HOSTS_PROPERTY)); + } + String hosts[] = host.split(","); + String port = getProperties().getProperty("port", "9042"); + if (port == null) { + throw new DBException(String.format("Required property \"%s\" missing for CassandraCQLClient", PORT_PROPERTY)); + } + + String username = getProperties().getProperty(USERNAME_PROPERTY); + String password = getProperties().getProperty(PASSWORD_PROPERTY); + + String keyspace = getProperties().getProperty(KEYSPACE_PROPERTY, KEYSPACE_PROPERTY_DEFAULT); + + readConsistencyLevel = ConsistencyLevel.valueOf(getProperties().getProperty(READ_CONSISTENCY_LEVEL_PROPERTY, READ_CONSISTENCY_LEVEL_PROPERTY_DEFAULT)); + writeConsistencyLevel = ConsistencyLevel.valueOf(getProperties().getProperty(WRITE_CONSISTENCY_LEVEL_PROPERTY, WRITE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT)); + + // public void connect(String node) {} + if ((username != null) && !username.isEmpty()) { + cluster = Cluster.builder() + .withCredentials(username, password) + .withPort(Integer.valueOf(port)) + .addContactPoints(hosts).build(); + } + else { + cluster = Cluster.builder() + .withPort(Integer.valueOf(port)) + .addContactPoints(hosts).build(); + } + + //Update number of connections based on threads + int threadcount = Integer.parseInt(getProperties().getProperty("threadcount","1")); + cluster.getConfiguration().getPoolingOptions().setMaxConnectionsPerHost(HostDistance.LOCAL, threadcount); + + //Set connection timeout 3min (default is 5s) + cluster.getConfiguration().getSocketOptions().setConnectTimeoutMillis(3*60*1000); + //Set read (execute) timeout 3min (default is 12s) + cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(3*60*1000); + + Metadata metadata = cluster.getMetadata(); + System.err.printf("Connected to cluster: %s\n", metadata.getClusterName()); + + for (Host discoveredHost : metadata.getAllHosts()) { + System.out.printf("Datacenter: %s; Host: %s; Rack: %s\n", + discoveredHost.getDatacenter(), + discoveredHost.getAddress(), + discoveredHost.getRack()); + } + + session = cluster.connect(keyspace); + + } catch (Exception e) { + throw new DBException(e); + } + }//synchronized + } + + /** + * Cleanup any state for this DB. Called once per DB instance; there is one + * DB instance per client thread. + */ + @Override + public void cleanup() throws DBException { + synchronized(initCount) { + final int curInitCount = initCount.decrementAndGet(); + if (curInitCount <= 0) { + session.close(); + cluster.close(); + cluster = null; + session = null; + } + if (curInitCount < 0) { + // This should never happen. + throw new DBException( + String.format("initCount is negative: %d", curInitCount)); + } + } + } + + /** + * Read a record from the database. Each field/value pair from the result + * will be stored in a HashMap. + * + * @param table The name of the table + * @param key The record key of the record to read. + * @param fields The list of fields to read, or null for all of them + * @param result A HashMap of field/value pairs for the result + * @return Zero on success, a non-zero error code on error + */ + @Override + public int read(String table, String key, Set<String> fields, HashMap<String, ByteIterator> result) { + try { + Statement stmt; + Select.Builder selectBuilder; + + if (fields == null) { + selectBuilder = QueryBuilder.select().all(); + } + else { + selectBuilder = QueryBuilder.select(); + for (String col : fields) { + ((Select.Selection) selectBuilder).column(col); + } + } + + stmt = selectBuilder.from(table).where(QueryBuilder.eq(YCSB_KEY, key)).limit(1); + stmt.setConsistencyLevel(readConsistencyLevel); + + if (_debug) { + System.out.println(stmt.toString()); + } + + ResultSet rs = session.execute(stmt); + + if (rs.isExhausted()) { + return NOT_FOUND; + } + + //Should be only 1 row + Row row = rs.one(); + ColumnDefinitions cd = row.getColumnDefinitions(); + + for (ColumnDefinitions.Definition def : cd) { + ByteBuffer val = row.getBytesUnsafe(def.getName()); + if (val != null) { + result.put(def.getName(), + new ByteArrayByteIterator(val.array())); + } + else { + result.put(def.getName(), null); + } + } + + return OK; + + } catch (Exception e) { + e.printStackTrace(); + System.out.println("Error reading key: " + key); + return ERR; + } + + } + + /** + * Perform a range scan for a set of records in the database. Each + * field/value pair from the result will be stored in a HashMap. + * + * Cassandra CQL uses "token" method for range scan which doesn't always + * yield intuitive results. + * + * @param table The name of the table + * @param startkey The record key of the first record to read. + * @param recordcount The number of records to read + * @param fields The list of fields to read, or null for all of them + * @param result A Vector of HashMaps, where each HashMap is a set + * field/value pairs for one record + * @return Zero on success, a non-zero error code on error + */ + @Override + public int scan(String table, String startkey, int recordcount, Set<String> fields, Vector<HashMap<String, ByteIterator>> result) { + + try { + Statement stmt; + Select.Builder selectBuilder; + + if (fields == null) { + selectBuilder = QueryBuilder.select().all(); + } + else { + selectBuilder = QueryBuilder.select(); + for (String col : fields) { + ((Select.Selection) selectBuilder).column(col); + } + } + + stmt = selectBuilder.from(table); + + //The statement builder is not setup right for tokens. + // So, we need to build it manually. + String initialStmt = stmt.toString(); + StringBuilder scanStmt = new StringBuilder(); + scanStmt.append( + initialStmt.substring(0, initialStmt.length()-1)); + scanStmt.append(" WHERE "); + scanStmt.append(QueryBuilder.token(YCSB_KEY)); + scanStmt.append(" >= "); + scanStmt.append("token('"); + scanStmt.append(startkey); + scanStmt.append("')"); + scanStmt.append(" LIMIT "); + scanStmt.append(recordcount); + + stmt = new SimpleStatement(scanStmt.toString()); + stmt.setConsistencyLevel(readConsistencyLevel); + + if (_debug) { + System.out.println(stmt.toString()); + } + + ResultSet rs = session.execute(stmt); + + HashMap<String, ByteIterator> tuple; + while (!rs.isExhausted()) { + Row row = rs.one(); + tuple = new HashMap<String, ByteIterator> (); + + ColumnDefinitions cd = row.getColumnDefinitions(); + + for (ColumnDefinitions.Definition def : cd) { + ByteBuffer val = row.getBytesUnsafe(def.getName()); + if (val != null) { + tuple.put(def.getName(), + new ByteArrayByteIterator(val.array())); + } + else { + tuple.put(def.getName(), null); + } + } + + result.add(tuple); + } + + return OK; + + } catch (Exception e) { + e.printStackTrace(); + System.out.println("Error scanning with startkey: " + startkey); + return ERR; + } + + } + + /** + * Update a record in the database. Any field/value pairs in the specified + * values HashMap will be written into the record with the specified record + * key, overwriting any existing values with the same field name. + * + * @param table The name of the table + * @param key The record key of the record to write. + * @param values A HashMap of field/value pairs to update in the record + * @return Zero on success, a non-zero error code on error + */ + @Override + public int update(String table, String key, HashMap<String, ByteIterator> values) { + //Insert and updates provide the same functionality + return insert(table, key, values); + } + + /** + * Insert a record in the database. Any field/value pairs in the specified + * values HashMap will be written into the record with the specified record + * key. + * + * @param table The name of the table + * @param key The record key of the record to insert. + * @param values A HashMap of field/value pairs to insert in the record + * @return Zero on success, a non-zero error code on error + */ + @Override + public int insert(String table, String key, HashMap<String, ByteIterator> values) { + + try { + Insert insertStmt = QueryBuilder.insertInto(table); + + //Add key + insertStmt.value(YCSB_KEY, key); + + //Add fields + for (Map.Entry<String, ByteIterator> entry : values.entrySet()) { + Object value; + ByteIterator byteIterator = entry.getValue(); + value = byteIterator.toString(); + + insertStmt.value(entry.getKey(), value); + } + + insertStmt.setConsistencyLevel(writeConsistencyLevel).enableTracing(); + + if (_debug) { + System.out.println(insertStmt.toString()); + } + + ResultSet rs = session.execute(insertStmt); + + return OK; + } catch (Exception e) { + e.printStackTrace(); + } + + return ERR; + } + + /** + * Delete a record from the database. + * + * @param table The name of the table + * @param key The record key of the record to delete. + * @return Zero on success, a non-zero error code on error + */ + @Override + public int delete(String table, String key) { + + try { + Statement stmt; + + stmt = QueryBuilder.delete().from(table).where(QueryBuilder.eq(YCSB_KEY, key)); + stmt.setConsistencyLevel(writeConsistencyLevel); + + if (_debug) { + System.out.println(stmt.toString()); + } + + ResultSet rs = session.execute(stmt); + + return OK; + } catch (Exception e) { + e.printStackTrace(); + System.out.println("Error deleting key: " + key); + } + + return ERR; + } + +} diff --git a/cassandra2/src/test/java/com/yahoo/ycsb/db/CassandraCQLClientTest.java b/cassandra2/src/test/java/com/yahoo/ycsb/db/CassandraCQLClientTest.java new file mode 100644 index 0000000000000000000000000000000000000000..f4339d55a7f0a1540d6475fe2cc45627a3860fd4 --- /dev/null +++ b/cassandra2/src/test/java/com/yahoo/ycsb/db/CassandraCQLClientTest.java @@ -0,0 +1,176 @@ +/** + * Copyright (c) 2015 YCSB contributors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +package com.yahoo.ycsb.db; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasEntry; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; + +import com.google.common.collect.Sets; + +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.Statement; +import com.datastax.driver.core.querybuilder.Insert; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.datastax.driver.core.querybuilder.Select; +import com.yahoo.ycsb.ByteIterator; +import com.yahoo.ycsb.StringByteIterator; +import com.yahoo.ycsb.measurements.Measurements; +import com.yahoo.ycsb.workloads.CoreWorkload; + +import org.cassandraunit.CassandraCQLUnit; +import org.cassandraunit.dataset.cql.ClassPathCQLDataSet; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +/** + * Integration tests for the Cassandra client + */ +public class CassandraCQLClientTest { + private final static String TABLE = "usertable"; + private final static String HOST = "localhost"; + private final static int PORT = 9142; + private final static String DEFAULT_ROW_KEY = "user1"; + + private CassandraCQLClient client; + private Session session; + + @ClassRule + public static CassandraCQLUnit cassandraUnit = + new CassandraCQLUnit(new ClassPathCQLDataSet("ycsb.cql", "ycsb")); + + @Before + public void setUpClient() throws Exception { + Properties p = new Properties(); + p.setProperty("hosts", HOST); + p.setProperty("port", Integer.toString(PORT)); + p.setProperty("table", TABLE); + + Measurements.setProperties(p); + final CoreWorkload workload = new CoreWorkload(); + workload.init(p); + client = new CassandraCQLClient(); + client.setProperties(p); + client.init(); + } + + @Before + public void setSession() { + session = cassandraUnit.getSession(); + } + + @After + public void tearDownClient() throws Exception { + client.cleanup(); + client = null; + } + + @After + public void clearTable() throws Exception { + // Clear the table so that each test starts fresh. + final Statement truncate = QueryBuilder.truncate(TABLE); + cassandraUnit.getSession().execute(truncate); + } + + @Test + public void testReadMissingRow() throws Exception { + final HashMap<String, ByteIterator> result = new HashMap<String, ByteIterator>(); + final int status = client.read(TABLE, "Missing row", null, result); + assertThat(result.size(), is(0)); + assertThat(status, is(CassandraCQLClient.NOT_FOUND)); + } + + private void insertRow() { + final String rowKey = DEFAULT_ROW_KEY; + Insert insertStmt = QueryBuilder.insertInto(TABLE); + insertStmt.value(CassandraCQLClient.YCSB_KEY, rowKey); + + insertStmt.value("field0", "value1"); + insertStmt.value("field1", "value2"); + session.execute(insertStmt); + } + + @Test + public void testRead() throws Exception { + insertRow(); + + final HashMap<String, ByteIterator> result = new HashMap<String, ByteIterator>(); + final int status = client.read(CoreWorkload.table, DEFAULT_ROW_KEY, null, result); + assertThat(status, is(CassandraCQLClient.OK)); + assertThat(result.entrySet(), hasSize(11)); + assertThat(result, hasEntry("field2", null)); + + final HashMap<String, String> strResult = new HashMap<String, String>(); + for (final Map.Entry<String, ByteIterator> e : result.entrySet()) { + if (e.getValue() != null) { + strResult.put(e.getKey(), e.getValue().toString()); + } + } + assertThat(strResult, hasEntry(CassandraCQLClient.YCSB_KEY, DEFAULT_ROW_KEY)); + assertThat(strResult, hasEntry("field0", "value1")); + assertThat(strResult, hasEntry("field1", "value2")); + } + + @Test + public void testReadSingleColumn() throws Exception { + insertRow(); + final HashMap<String, ByteIterator> result = new HashMap<String, ByteIterator>(); + final Set<String> fields = Sets.newHashSet("field1"); + final int status = client.read(CoreWorkload.table, DEFAULT_ROW_KEY, fields, result); + assertThat(status, is(CassandraCQLClient.OK)); + assertThat(result.entrySet(), hasSize(1)); + final Map<String, String> strResult = StringByteIterator.getStringMap(result); + assertThat(strResult, hasEntry("field1", "value2")); + } + + @Test + public void testUpdate() throws Exception { + final String key = "key"; + final HashMap<String, String> input = new HashMap<String, String>(); + input.put("field0", "value1"); + input.put("field1", "value2"); + + final int status = client.insert(TABLE, key, StringByteIterator.getByteIteratorMap(input)); + assertThat(status, is(CassandraCQLClient.OK)); + + // Verify result + final Select selectStmt = + QueryBuilder.select("field0", "field1") + .from(TABLE) + .where(QueryBuilder.eq(CassandraCQLClient.YCSB_KEY, key)) + .limit(1); + + final ResultSet rs = session.execute(selectStmt); + final Row row = rs.one(); + assertThat(row, notNullValue()); + assertThat(rs.isExhausted(), is(true)); + assertThat(row.getString("field0"), is("value1")); + assertThat(row.getString("field1"), is("value2")); + } +} diff --git a/cassandra2/src/test/resources/ycsb.cql b/cassandra2/src/test/resources/ycsb.cql new file mode 100644 index 0000000000000000000000000000000000000000..2888bdce3a0095561a8f68ff34cf2f17521a5ef3 --- /dev/null +++ b/cassandra2/src/test/resources/ycsb.cql @@ -0,0 +1,12 @@ +CREATE TABLE usertable ( + y_id varchar primary key, + field0 varchar, + field1 varchar, + field2 varchar, + field3 varchar, + field4 varchar, + field5 varchar, + field6 varchar, + field7 varchar, + field8 varchar, + field9 varchar); diff --git a/pom.xml b/pom.xml index 5e71d9837daba411d8e4b694d6c0c1ebee0322d0..b08e1bb6fbe3d9e5c007e74ab44a63c8cb3d5e7e 100644 --- a/pom.xml +++ b/pom.xml @@ -75,6 +75,7 @@ LICENSE file. <accumulo.version>1.6.0</accumulo.version> <cassandra.version>1.2.9</cassandra.version> <cassandra.cql.version>1.0.3</cassandra.cql.version> + <cassandra2.cql.version>2.1.8</cassandra2.cql.version> <gemfire.version>8.1.0</gemfire.version> <infinispan.version>7.2.2.Final</infinispan.version> <kudu.version>0.5.0</kudu.version> @@ -101,6 +102,7 @@ LICENSE file. <module>accumulo</module> <module>aerospike</module> <module>cassandra</module> + <module>cassandra2</module> <module>couchbase</module> <module>distribution</module> <module>dynamodb</module>