Skip to content
Snippets Groups Projects
Commit 1700b3e2 authored by Connor McCoy's avatar Connor McCoy
Browse files

Merge pull request #454 from cmccoy/cassandra2

Adds a Cassandra 2 CQL binding.
parents 4551273e b91fc2e0
No related branches found
No related tags found
No related merge requests found
...@@ -51,6 +51,7 @@ DATABASES = { ...@@ -51,6 +51,7 @@ DATABASES = {
"cassandra-8" : "com.yahoo.ycsb.db.CassandraClient8", "cassandra-8" : "com.yahoo.ycsb.db.CassandraClient8",
"cassandra-10" : "com.yahoo.ycsb.db.CassandraClient10", "cassandra-10" : "com.yahoo.ycsb.db.CassandraClient10",
"cassandra-cql": "com.yahoo.ycsb.db.CassandraCQLClient", "cassandra-cql": "com.yahoo.ycsb.db.CassandraCQLClient",
"cassandra2-cql": "com.yahoo.ycsb.db.CassandraCQLClient",
"couchbase" : "com.yahoo.ycsb.db.CouchbaseClient", "couchbase" : "com.yahoo.ycsb.db.CouchbaseClient",
"dynamodb" : "com.yahoo.ycsb.db.DynamoDBClient", "dynamodb" : "com.yahoo.ycsb.db.DynamoDBClient",
"elasticsearch": "com.yahoo.ycsb.db.ElasticSearchClient", "elasticsearch": "com.yahoo.ycsb.db.ElasticSearchClient",
......
<!--
Copyright (c) 2015 YCSB contributors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License"); you
may not use this file except in compliance with the License. You
may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied. See the License for the specific language governing
permissions and limitations under the License. See accompanying
LICENSE file.
-->
# Cassandra (0.7, 0.8, 1.x) drivers for YCSB
**For Cassandra 2 CQL support, use the `cassandra2-cql` binding. The Thrift drivers below are deprecated, and the CQL driver here does not support Cassandra 2.1+.**
There are three drivers in the Cassandra binding:
* `cassandra-7`: Cassandra 0.7 Thrift binding.
* `cassandra-8`: Cassandra 0.8 Thrift binding.
* `cassandra-10`: Cassandra 1.0+ Thrift binding.
* `cassandra-cql`: Cassandra CQL binding, for Cassandra 1.x to 2.0. See `cassandra2/README.md` for details on parameters.
# `cassandra-10`
## Creating a table
Using `cassandra-cli`:
create keyspace usertable with placement_strategy = 'org.apache.cassandra.locator.SimpleStrategy' and strategy_options = {replication_factor:1};
create column family data with column_type = 'Standard' and comparator = 'UTF8Type';
**Note that `replication_factor` and consistency levels (below) will affect performance.**
## Configuration Parameters
- `hosts` (**required**)
- Cassandra nodes to connect to.
- No default.
* `port`
- Thrift port for communicating with Cassandra cluster.
* Default is `9160`.
- `cassandra.columnfamily`
- Column family name - must match the column family for the table created (see above).
- Default value is `data`
- `cassandra.username`
- `cassandra.password`
- Optional user name and password for authentication. See http://docs.datastax.com/en/cassandra/2.0/cassandra/security/security_config_native_authenticate_t.html for details.
* `cassandra.readconsistencylevel`
* `cassandra.scanconsistencylevel`
* `cassandra.writeconsistencylevel`
- Default value is `ONE`
- Consistency level for reads and writes, respectively. See the [DataStax documentation](http://docs.datastax.com/en/cassandra/2.0/cassandra/dml/dml_config_consistency_c.html) for details.
- *Note that the default setting does not provide durability in the face of node failure. Changing this setting will affect observed performance.* See also `replication_factor`, above.
...@@ -33,22 +33,9 @@ import java.util.concurrent.atomic.AtomicInteger; ...@@ -33,22 +33,9 @@ import java.util.concurrent.atomic.AtomicInteger;
/** /**
* Tested with Cassandra 2.0, CQL client for YCSB framework * Tested with Cassandra 2.0, CQL client for YCSB framework
* *
* In CQLSH, create keyspace and table. Something like: * See {@code cassandra2} for a version compatible with Cassandra 2.1+.
* cqlsh> create keyspace ycsb * See {@code cassandra2/README.md} for details.
* WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor': 1 };
* cqlsh> create table usertable (
* y_id varchar primary key,
* field0 varchar,
* field1 varchar,
* field2 varchar,
* field3 varchar,
* field4 varchar,
* field5 varchar,
* field6 varchar,
* field7 varchar,
* field8 varchar,
* field9 varchar);
* *
* @author cmatser * @author cmatser
*/ */
......
<!--
Copyright (c) 2015 YCSB contributors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License"); you
may not use this file except in compliance with the License. You
may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied. See the License for the specific language governing
permissions and limitations under the License. See accompanying
LICENSE file.
-->
# Apache Cassandra 2.x CQL binding
Binding for [Apache Cassandra](http://cassandra.apache.org), using the CQL API
via the [DataStax
driver](http://docs.datastax.com/en/developer/java-driver/2.1/java-driver/whatsNew2.html).
To run against the (deprecated) Cassandra Thrift API, use the `cassandra-10` binding.
## Creating a table for use with YCSB
For keyspace `ycsb`, table `usertable`:
cqlsh> create keyspace ycsb
WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor': 3 };
cqlsh> USE ycsb;
cqlsh> create table usertable (
y_id varchar primary key,
field0 varchar,
field1 varchar,
field2 varchar,
field3 varchar,
field4 varchar,
field5 varchar,
field6 varchar,
field7 varchar,
field8 varchar,
field9 varchar);
**Note that `replication_factor` and consistency levels (below) will affect performance.**
## Cassandra Configuration Parameters
- `hosts` (**required**)
- Cassandra nodes to connect to.
- No default.
* `port`
* CQL port for communicating with Cassandra cluster.
* Default is `9042`.
- `cassandra.keyspace`
Keyspace name - must match the keyspace for the table created (see above).
See http://docs.datastax.com/en/cql/3.1/cql/cql_reference/create_keyspace_r.html for details.
- Default value is `ycsb`
- `cassandra.username`
- `cassandra.password`
- Optional user name and password for authentication. See http://docs.datastax.com/en/cassandra/2.0/cassandra/security/security_config_native_authenticate_t.html for details.
* `cassandra.readconsistencylevel`
* `cassandra.writeconsistencylevel`
* Default value is `ONE`
- Consistency level for reads and writes, respectively. See the [DataStax documentation](http://docs.datastax.com/en/cassandra/2.0/cassandra/dml/dml_config_consistency_c.html) for details.
* *Note that the default setting does not provide durability in the face of node failure. Changing this setting will affect observed performance.* See also `replication_factor`, above.
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2012-2015 YCSB contributors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License"); you
may not use this file except in compliance with the License. You
may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied. See the License for the specific language governing
permissions and limitations under the License. See accompanying
LICENSE file.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.yahoo.ycsb</groupId>
<artifactId>binding-parent</artifactId>
<version>0.5.0-SNAPSHOT</version>
<relativePath>../binding-parent</relativePath>
</parent>
<artifactId>cassandra2-binding</artifactId>
<name>Cassandra 2.1+ DB Binding</name>
<packaging>jar</packaging>
<dependencies>
<!-- CQL driver -->
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>${cassandra2.cql.version}</version>
</dependency>
<dependency>
<groupId>com.yahoo.ycsb</groupId>
<artifactId>core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.cassandraunit</groupId>
<artifactId>cassandra-unit-shaded</artifactId>
<version>2.1.9.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
/**
* Copyright (c) 2013-2015 YCSB contributors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License. See accompanying LICENSE file.
*
* Submitted by Chrisjan Matser on 10/11/2010.
*/
package com.yahoo.ycsb.db;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ColumnDefinitions;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.HostDistance;
import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.querybuilder.Insert;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Select;
import com.yahoo.ycsb.ByteArrayByteIterator;
import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.DB;
import com.yahoo.ycsb.DBException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.Vector;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Cassandra 2.x CQL client.
*
* See {@code cassandra2/README.md} for details.
*
* @author cmatser
*/
public class CassandraCQLClient extends DB {
protected static Cluster cluster = null;
protected static Session session = null;
private static ConsistencyLevel readConsistencyLevel = ConsistencyLevel.ONE;
private static ConsistencyLevel writeConsistencyLevel = ConsistencyLevel.ONE;
public static final int OK = 0;
public static final int ERR = -1;
public static final int NOT_FOUND = -3;
public static final String YCSB_KEY = "y_id";
public static final String KEYSPACE_PROPERTY = "cassandra.keyspace";
public static final String KEYSPACE_PROPERTY_DEFAULT = "ycsb";
public static final String USERNAME_PROPERTY = "cassandra.username";
public static final String PASSWORD_PROPERTY = "cassandra.password";
public static final String HOSTS_PROPERTY = "hosts";
public static final String PORT_PROPERTY = "port";
public static final String READ_CONSISTENCY_LEVEL_PROPERTY = "cassandra.readconsistencylevel";
public static final String READ_CONSISTENCY_LEVEL_PROPERTY_DEFAULT = "ONE";
public static final String WRITE_CONSISTENCY_LEVEL_PROPERTY = "cassandra.writeconsistencylevel";
public static final String WRITE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT = "ONE";
/** Count the number of times initialized to teardown on the last {@link #cleanup()}. */
private static final AtomicInteger initCount = new AtomicInteger(0);
private static boolean _debug = false;
/**
* Initialize any state for this DB. Called once per DB instance; there is
* one DB instance per client thread.
*/
@Override
public void init() throws DBException {
//Keep track of number of calls to init (for later cleanup)
initCount.incrementAndGet();
//Synchronized so that we only have a single
// cluster/session instance for all the threads.
synchronized (initCount) {
//Check if the cluster has already been initialized
if (cluster != null) {
return;
}
try {
_debug = Boolean.parseBoolean(getProperties().getProperty("debug", "false"));
String host = getProperties().getProperty(HOSTS_PROPERTY);
if (host == null) {
throw new DBException(String.format("Required property \"%s\" missing for CassandraCQLClient", HOSTS_PROPERTY));
}
String hosts[] = host.split(",");
String port = getProperties().getProperty("port", "9042");
if (port == null) {
throw new DBException(String.format("Required property \"%s\" missing for CassandraCQLClient", PORT_PROPERTY));
}
String username = getProperties().getProperty(USERNAME_PROPERTY);
String password = getProperties().getProperty(PASSWORD_PROPERTY);
String keyspace = getProperties().getProperty(KEYSPACE_PROPERTY, KEYSPACE_PROPERTY_DEFAULT);
readConsistencyLevel = ConsistencyLevel.valueOf(getProperties().getProperty(READ_CONSISTENCY_LEVEL_PROPERTY, READ_CONSISTENCY_LEVEL_PROPERTY_DEFAULT));
writeConsistencyLevel = ConsistencyLevel.valueOf(getProperties().getProperty(WRITE_CONSISTENCY_LEVEL_PROPERTY, WRITE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT));
// public void connect(String node) {}
if ((username != null) && !username.isEmpty()) {
cluster = Cluster.builder()
.withCredentials(username, password)
.withPort(Integer.valueOf(port))
.addContactPoints(hosts).build();
}
else {
cluster = Cluster.builder()
.withPort(Integer.valueOf(port))
.addContactPoints(hosts).build();
}
//Update number of connections based on threads
int threadcount = Integer.parseInt(getProperties().getProperty("threadcount","1"));
cluster.getConfiguration().getPoolingOptions().setMaxConnectionsPerHost(HostDistance.LOCAL, threadcount);
//Set connection timeout 3min (default is 5s)
cluster.getConfiguration().getSocketOptions().setConnectTimeoutMillis(3*60*1000);
//Set read (execute) timeout 3min (default is 12s)
cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(3*60*1000);
Metadata metadata = cluster.getMetadata();
System.err.printf("Connected to cluster: %s\n", metadata.getClusterName());
for (Host discoveredHost : metadata.getAllHosts()) {
System.out.printf("Datacenter: %s; Host: %s; Rack: %s\n",
discoveredHost.getDatacenter(),
discoveredHost.getAddress(),
discoveredHost.getRack());
}
session = cluster.connect(keyspace);
} catch (Exception e) {
throw new DBException(e);
}
}//synchronized
}
/**
* Cleanup any state for this DB. Called once per DB instance; there is one
* DB instance per client thread.
*/
@Override
public void cleanup() throws DBException {
synchronized(initCount) {
final int curInitCount = initCount.decrementAndGet();
if (curInitCount <= 0) {
session.close();
cluster.close();
cluster = null;
session = null;
}
if (curInitCount < 0) {
// This should never happen.
throw new DBException(
String.format("initCount is negative: %d", curInitCount));
}
}
}
/**
* Read a record from the database. Each field/value pair from the result
* will be stored in a HashMap.
*
* @param table The name of the table
* @param key The record key of the record to read.
* @param fields The list of fields to read, or null for all of them
* @param result A HashMap of field/value pairs for the result
* @return Zero on success, a non-zero error code on error
*/
@Override
public int read(String table, String key, Set<String> fields, HashMap<String, ByteIterator> result) {
try {
Statement stmt;
Select.Builder selectBuilder;
if (fields == null) {
selectBuilder = QueryBuilder.select().all();
}
else {
selectBuilder = QueryBuilder.select();
for (String col : fields) {
((Select.Selection) selectBuilder).column(col);
}
}
stmt = selectBuilder.from(table).where(QueryBuilder.eq(YCSB_KEY, key)).limit(1);
stmt.setConsistencyLevel(readConsistencyLevel);
if (_debug) {
System.out.println(stmt.toString());
}
ResultSet rs = session.execute(stmt);
if (rs.isExhausted()) {
return NOT_FOUND;
}
//Should be only 1 row
Row row = rs.one();
ColumnDefinitions cd = row.getColumnDefinitions();
for (ColumnDefinitions.Definition def : cd) {
ByteBuffer val = row.getBytesUnsafe(def.getName());
if (val != null) {
result.put(def.getName(),
new ByteArrayByteIterator(val.array()));
}
else {
result.put(def.getName(), null);
}
}
return OK;
} catch (Exception e) {
e.printStackTrace();
System.out.println("Error reading key: " + key);
return ERR;
}
}
/**
* Perform a range scan for a set of records in the database. Each
* field/value pair from the result will be stored in a HashMap.
*
* Cassandra CQL uses "token" method for range scan which doesn't always
* yield intuitive results.
*
* @param table The name of the table
* @param startkey The record key of the first record to read.
* @param recordcount The number of records to read
* @param fields The list of fields to read, or null for all of them
* @param result A Vector of HashMaps, where each HashMap is a set
* field/value pairs for one record
* @return Zero on success, a non-zero error code on error
*/
@Override
public int scan(String table, String startkey, int recordcount, Set<String> fields, Vector<HashMap<String, ByteIterator>> result) {
try {
Statement stmt;
Select.Builder selectBuilder;
if (fields == null) {
selectBuilder = QueryBuilder.select().all();
}
else {
selectBuilder = QueryBuilder.select();
for (String col : fields) {
((Select.Selection) selectBuilder).column(col);
}
}
stmt = selectBuilder.from(table);
//The statement builder is not setup right for tokens.
// So, we need to build it manually.
String initialStmt = stmt.toString();
StringBuilder scanStmt = new StringBuilder();
scanStmt.append(
initialStmt.substring(0, initialStmt.length()-1));
scanStmt.append(" WHERE ");
scanStmt.append(QueryBuilder.token(YCSB_KEY));
scanStmt.append(" >= ");
scanStmt.append("token('");
scanStmt.append(startkey);
scanStmt.append("')");
scanStmt.append(" LIMIT ");
scanStmt.append(recordcount);
stmt = new SimpleStatement(scanStmt.toString());
stmt.setConsistencyLevel(readConsistencyLevel);
if (_debug) {
System.out.println(stmt.toString());
}
ResultSet rs = session.execute(stmt);
HashMap<String, ByteIterator> tuple;
while (!rs.isExhausted()) {
Row row = rs.one();
tuple = new HashMap<String, ByteIterator> ();
ColumnDefinitions cd = row.getColumnDefinitions();
for (ColumnDefinitions.Definition def : cd) {
ByteBuffer val = row.getBytesUnsafe(def.getName());
if (val != null) {
tuple.put(def.getName(),
new ByteArrayByteIterator(val.array()));
}
else {
tuple.put(def.getName(), null);
}
}
result.add(tuple);
}
return OK;
} catch (Exception e) {
e.printStackTrace();
System.out.println("Error scanning with startkey: " + startkey);
return ERR;
}
}
/**
* Update a record in the database. Any field/value pairs in the specified
* values HashMap will be written into the record with the specified record
* key, overwriting any existing values with the same field name.
*
* @param table The name of the table
* @param key The record key of the record to write.
* @param values A HashMap of field/value pairs to update in the record
* @return Zero on success, a non-zero error code on error
*/
@Override
public int update(String table, String key, HashMap<String, ByteIterator> values) {
//Insert and updates provide the same functionality
return insert(table, key, values);
}
/**
* Insert a record in the database. Any field/value pairs in the specified
* values HashMap will be written into the record with the specified record
* key.
*
* @param table The name of the table
* @param key The record key of the record to insert.
* @param values A HashMap of field/value pairs to insert in the record
* @return Zero on success, a non-zero error code on error
*/
@Override
public int insert(String table, String key, HashMap<String, ByteIterator> values) {
try {
Insert insertStmt = QueryBuilder.insertInto(table);
//Add key
insertStmt.value(YCSB_KEY, key);
//Add fields
for (Map.Entry<String, ByteIterator> entry : values.entrySet()) {
Object value;
ByteIterator byteIterator = entry.getValue();
value = byteIterator.toString();
insertStmt.value(entry.getKey(), value);
}
insertStmt.setConsistencyLevel(writeConsistencyLevel).enableTracing();
if (_debug) {
System.out.println(insertStmt.toString());
}
ResultSet rs = session.execute(insertStmt);
return OK;
} catch (Exception e) {
e.printStackTrace();
}
return ERR;
}
/**
* Delete a record from the database.
*
* @param table The name of the table
* @param key The record key of the record to delete.
* @return Zero on success, a non-zero error code on error
*/
@Override
public int delete(String table, String key) {
try {
Statement stmt;
stmt = QueryBuilder.delete().from(table).where(QueryBuilder.eq(YCSB_KEY, key));
stmt.setConsistencyLevel(writeConsistencyLevel);
if (_debug) {
System.out.println(stmt.toString());
}
ResultSet rs = session.execute(stmt);
return OK;
} catch (Exception e) {
e.printStackTrace();
System.out.println("Error deleting key: " + key);
}
return ERR;
}
}
/**
* Copyright (c) 2015 YCSB contributors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb.db;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import com.google.common.collect.Sets;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.querybuilder.Insert;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Select;
import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.StringByteIterator;
import com.yahoo.ycsb.measurements.Measurements;
import com.yahoo.ycsb.workloads.CoreWorkload;
import org.cassandraunit.CassandraCQLUnit;
import org.cassandraunit.dataset.cql.ClassPathCQLDataSet;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
/**
* Integration tests for the Cassandra client
*/
public class CassandraCQLClientTest {
private final static String TABLE = "usertable";
private final static String HOST = "localhost";
private final static int PORT = 9142;
private final static String DEFAULT_ROW_KEY = "user1";
private CassandraCQLClient client;
private Session session;
@ClassRule
public static CassandraCQLUnit cassandraUnit =
new CassandraCQLUnit(new ClassPathCQLDataSet("ycsb.cql", "ycsb"));
@Before
public void setUpClient() throws Exception {
Properties p = new Properties();
p.setProperty("hosts", HOST);
p.setProperty("port", Integer.toString(PORT));
p.setProperty("table", TABLE);
Measurements.setProperties(p);
final CoreWorkload workload = new CoreWorkload();
workload.init(p);
client = new CassandraCQLClient();
client.setProperties(p);
client.init();
}
@Before
public void setSession() {
session = cassandraUnit.getSession();
}
@After
public void tearDownClient() throws Exception {
client.cleanup();
client = null;
}
@After
public void clearTable() throws Exception {
// Clear the table so that each test starts fresh.
final Statement truncate = QueryBuilder.truncate(TABLE);
cassandraUnit.getSession().execute(truncate);
}
@Test
public void testReadMissingRow() throws Exception {
final HashMap<String, ByteIterator> result = new HashMap<String, ByteIterator>();
final int status = client.read(TABLE, "Missing row", null, result);
assertThat(result.size(), is(0));
assertThat(status, is(CassandraCQLClient.NOT_FOUND));
}
private void insertRow() {
final String rowKey = DEFAULT_ROW_KEY;
Insert insertStmt = QueryBuilder.insertInto(TABLE);
insertStmt.value(CassandraCQLClient.YCSB_KEY, rowKey);
insertStmt.value("field0", "value1");
insertStmt.value("field1", "value2");
session.execute(insertStmt);
}
@Test
public void testRead() throws Exception {
insertRow();
final HashMap<String, ByteIterator> result = new HashMap<String, ByteIterator>();
final int status = client.read(CoreWorkload.table, DEFAULT_ROW_KEY, null, result);
assertThat(status, is(CassandraCQLClient.OK));
assertThat(result.entrySet(), hasSize(11));
assertThat(result, hasEntry("field2", null));
final HashMap<String, String> strResult = new HashMap<String, String>();
for (final Map.Entry<String, ByteIterator> e : result.entrySet()) {
if (e.getValue() != null) {
strResult.put(e.getKey(), e.getValue().toString());
}
}
assertThat(strResult, hasEntry(CassandraCQLClient.YCSB_KEY, DEFAULT_ROW_KEY));
assertThat(strResult, hasEntry("field0", "value1"));
assertThat(strResult, hasEntry("field1", "value2"));
}
@Test
public void testReadSingleColumn() throws Exception {
insertRow();
final HashMap<String, ByteIterator> result = new HashMap<String, ByteIterator>();
final Set<String> fields = Sets.newHashSet("field1");
final int status = client.read(CoreWorkload.table, DEFAULT_ROW_KEY, fields, result);
assertThat(status, is(CassandraCQLClient.OK));
assertThat(result.entrySet(), hasSize(1));
final Map<String, String> strResult = StringByteIterator.getStringMap(result);
assertThat(strResult, hasEntry("field1", "value2"));
}
@Test
public void testUpdate() throws Exception {
final String key = "key";
final HashMap<String, String> input = new HashMap<String, String>();
input.put("field0", "value1");
input.put("field1", "value2");
final int status = client.insert(TABLE, key, StringByteIterator.getByteIteratorMap(input));
assertThat(status, is(CassandraCQLClient.OK));
// Verify result
final Select selectStmt =
QueryBuilder.select("field0", "field1")
.from(TABLE)
.where(QueryBuilder.eq(CassandraCQLClient.YCSB_KEY, key))
.limit(1);
final ResultSet rs = session.execute(selectStmt);
final Row row = rs.one();
assertThat(row, notNullValue());
assertThat(rs.isExhausted(), is(true));
assertThat(row.getString("field0"), is("value1"));
assertThat(row.getString("field1"), is("value2"));
}
}
CREATE TABLE usertable (
y_id varchar primary key,
field0 varchar,
field1 varchar,
field2 varchar,
field3 varchar,
field4 varchar,
field5 varchar,
field6 varchar,
field7 varchar,
field8 varchar,
field9 varchar);
...@@ -75,6 +75,7 @@ LICENSE file. ...@@ -75,6 +75,7 @@ LICENSE file.
<accumulo.version>1.6.0</accumulo.version> <accumulo.version>1.6.0</accumulo.version>
<cassandra.version>1.2.9</cassandra.version> <cassandra.version>1.2.9</cassandra.version>
<cassandra.cql.version>1.0.3</cassandra.cql.version> <cassandra.cql.version>1.0.3</cassandra.cql.version>
<cassandra2.cql.version>2.1.8</cassandra2.cql.version>
<gemfire.version>8.1.0</gemfire.version> <gemfire.version>8.1.0</gemfire.version>
<infinispan.version>7.2.2.Final</infinispan.version> <infinispan.version>7.2.2.Final</infinispan.version>
<kudu.version>0.5.0</kudu.version> <kudu.version>0.5.0</kudu.version>
...@@ -101,6 +102,7 @@ LICENSE file. ...@@ -101,6 +102,7 @@ LICENSE file.
<module>accumulo</module> <module>accumulo</module>
<module>aerospike</module> <module>aerospike</module>
<module>cassandra</module> <module>cassandra</module>
<module>cassandra2</module>
<module>couchbase</module> <module>couchbase</module>
<module>distribution</module> <module>distribution</module>
<module>dynamodb</module> <module>dynamodb</module>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment