Skip to content
Snippets Groups Projects
Commit 54f43f44 authored by Sean Busbey's avatar Sean Busbey Committed by GitHub
Browse files

Merge pull request #805 from busbey/YCSB-794

Remove deprecated Cassandra clients
parents 7e34f5fd 886ef9c3
No related branches found
No related tags found
No related merge requests found
Showing
with 203 additions and 2459 deletions
......@@ -29,9 +29,6 @@ accumulo:com.yahoo.ycsb.db.accumulo.AccumuloClient
aerospike:com.yahoo.ycsb.db.AerospikeClient
asynchbase:com.yahoo.ycsb.db.AsyncHBaseClient
basic:com.yahoo.ycsb.BasicDB
cassandra-7:com.yahoo.ycsb.db.CassandraClient7
cassandra-8:com.yahoo.ycsb.db.CassandraClient8
cassandra-10:com.yahoo.ycsb.db.CassandraClient10
cassandra-cql:com.yahoo.ycsb.db.CassandraCQLClient
cassandra2-cql:com.yahoo.ycsb.db.CassandraCQLClient
couchbase:com.yahoo.ycsb.db.CouchbaseClient
......
......@@ -55,9 +55,6 @@ DATABASES = {
"aerospike" : "com.yahoo.ycsb.db.AerospikeClient",
"asynchbase" : "com.yahoo.ycsb.db.AsyncHBaseClient",
"basic" : "com.yahoo.ycsb.BasicDB",
"cassandra-7" : "com.yahoo.ycsb.db.CassandraClient7",
"cassandra-8" : "com.yahoo.ycsb.db.CassandraClient8",
"cassandra-10" : "com.yahoo.ycsb.db.CassandraClient10",
"cassandra-cql": "com.yahoo.ycsb.db.CassandraCQLClient",
"cassandra2-cql": "com.yahoo.ycsb.db.CassandraCQLClient",
"couchbase" : "com.yahoo.ycsb.db.CouchbaseClient",
......@@ -243,11 +240,11 @@ def main():
# Classpath set up
binding = args.database.split("-")[0]
# Deprecation message for the entire cassandra-binding
if binding == "cassandra":
warn("The 'cassandra-7', 'cassandra-8', 'cassandra-10', and "
"cassandra-cql' clients are deprecated. If you are using "
"Cassandra 2.X try using the 'cassandra2-cql' client instead.")
if binding == "cassandra2":
warn("The 'cassandra2-cql' client has been deprecated. It has been "
"renamed to simply 'cassandra-cql'. This alias will be removed"
" in the next YCSB release.")
binding = "cassandra"
if binding == "couchbase":
warn("The 'couchbase' client has been deprecated. If you are using "
......
......@@ -117,6 +117,12 @@ GOTO confAdded
SET CLASSPATH=%YCSB_HOME%\conf
:confAdded
@REM Cassandra2 deprecation message
IF NOT "%BINDING_DIR%" == "cassandra2" GOTO notAliasCassandra
echo [WARN] The 'cassandra2-cql' client has been deprecated. It has been renamed to simply 'cassandra-cql'. This alias will be removed in the next YCSB release.
SET BINDING_DIR=cassandra
:notAliasCassandra
@REM Build classpath according to source checkout or release distribution
IF EXIST "%YCSB_HOME%\pom.xml" GOTO gotSource
......@@ -178,11 +184,6 @@ FOR %%F IN (%YCSB_HOME%\%BINDING_DIR%\target\dependency\*.jar) DO (
:classpathComplete
@REM Cassandra deprecation message
IF NOT "%BINDING_DIR%" == "cassandra" GOTO notOldCassandra
echo [WARN] The 'cassandra-7', 'cassandra-8', 'cassandra-10', and cassandra-cql' clients are deprecated. If you are using Cassandra 2.X try using the 'cassandra2-cql' client instead.
:notOldCassandra
@REM Couchbase deprecation message
IF NOT "%BINDING_DIR%" == "couchbase" GOTO notOldCouchbase
echo [WARN] The 'couchbase' client is deprecated. If you are using Couchbase 4.0+ try using the 'couchbase2' client instead.
......
......@@ -133,6 +133,14 @@ else
CLASSPATH="$CLASSPATH:$YCSB_HOME/conf"
fi
# Cassandra2 deprecation message
if [ "${BINDING_DIR}" = "cassandra2" ] ; then
echo "[WARN] The 'cassandra2-cql' client has been deprecated. It has been \
renamed to simply 'cassandra-cql'. This alias will be removed in the next \
YCSB release."
BINDING_DIR="cassandra"
fi
# Build classpath
# The "if" check after the "for" is because glob may just return the pattern
# when no files are found. The "if" makes sure the file is really there.
......@@ -208,13 +216,6 @@ else
done
fi
# Cassandra deprecation message
if [ "$BINDING_DIR" = "cassandra" ] ; then
echo "[WARN] The 'cassandra-7', 'cassandra-8', 'cassandra-10', and \
cassandra-cql' clients are deprecated. If you are using \
Cassandra 2.X try using the 'cassandra2-cql' client instead."
fi
# Couchbase deprecation message
if [ "${BINDING_DIR}" = "couchbase" ] ; then
echo "[WARN] The 'couchbase' client is deprecated. If you are using \
......
......@@ -14,59 +14,67 @@ implied. See the License for the specific language governing
permissions and limitations under the License. See accompanying
LICENSE file.
-->
# THIS BINDING IS DEPRECATED
----------------------------
Date of removal from YCSB: **March 2016**
Due to the low amount of use and support for older Cassandra lineages (0.X and 1.X), YCSB will not support clients for these versions either.
# Apache Cassandra 2.x CQL binding
For Cassandra 2.X use the ```cassandra2-cql``` client: https://github.com/brianfrankcooper/YCSB/tree/master/cassandra2.
Binding for [Apache Cassandra](http://cassandra.apache.org), using the CQL API
via the [DataStax
driver](http://docs.datastax.com/en/developer/java-driver/2.1/java-driver/whatsNew2.html).
# Cassandra (0.7, 0.8, 1.x) drivers for YCSB
To run against the (deprecated) Cassandra Thrift API, use the `cassandra-10` binding.
**For Cassandra 2 CQL support, use the `cassandra2-cql` binding. The Thrift drivers below are deprecated, and the CQL driver here does not support Cassandra 2.1+.**
## Creating a table for use with YCSB
There are three drivers in the Cassandra binding:
For keyspace `ycsb`, table `usertable`:
* `cassandra-7`: Cassandra 0.7 Thrift binding.
* `cassandra-8`: Cassandra 0.8 Thrift binding.
* `cassandra-10`: Cassandra 1.0+ Thrift binding.
* `cassandra-cql`: Cassandra CQL binding, for Cassandra 1.x to 2.0. See `cassandra2/README.md` for details on parameters.
# `cassandra-10`
## Creating a table
Using `cassandra-cli`:
create keyspace usertable with placement_strategy = 'org.apache.cassandra.locator.SimpleStrategy' and strategy_options = {replication_factor:1};
create column family data with column_type = 'Standard' and comparator = 'UTF8Type';
cqlsh> create keyspace ycsb
WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor': 3 };
cqlsh> USE ycsb;
cqlsh> create table usertable (
y_id varchar primary key,
field0 varchar,
field1 varchar,
field2 varchar,
field3 varchar,
field4 varchar,
field5 varchar,
field6 varchar,
field7 varchar,
field8 varchar,
field9 varchar);
**Note that `replication_factor` and consistency levels (below) will affect performance.**
## Configuration Parameters
## Cassandra Configuration Parameters
- `hosts` (**required**)
- Cassandra nodes to connect to.
- No default.
* `port`
- Thrift port for communicating with Cassandra cluster.
* Default is `9160`.
* CQL port for communicating with Cassandra cluster.
* Default is `9042`.
- `cassandra.keyspace`
Keyspace name - must match the keyspace for the table created (see above).
See http://docs.datastax.com/en/cql/3.1/cql/cql_reference/create_keyspace_r.html for details.
- `cassandra.columnfamily`
- Column family name - must match the column family for the table created (see above).
- Default value is `data`
- Default value is `ycsb`
- `cassandra.username`
- `cassandra.password`
- Optional user name and password for authentication. See http://docs.datastax.com/en/cassandra/2.0/cassandra/security/security_config_native_authenticate_t.html for details.
* `cassandra.readconsistencylevel`
* `cassandra.scanconsistencylevel`
* `cassandra.writeconsistencylevel`
- Default value is `ONE`
* Default value is `ONE`
- Consistency level for reads and writes, respectively. See the [DataStax documentation](http://docs.datastax.com/en/cassandra/2.0/cassandra/dml/dml_config_consistency_c.html) for details.
- *Note that the default setting does not provide durability in the face of node failure. Changing this setting will affect observed performance.* See also `replication_factor`, above.
* *Note that the default setting does not provide durability in the face of node failure. Changing this setting will affect observed performance.* See also `replication_factor`, above.
* `cassandra.maxconnections`
* `cassandra.coreconnections`
* Defaults for max and core connections can be found here: https://datastax.github.io/java-driver/2.1.8/features/pooling/#pool-size. Cassandra 2.0.X falls under protocol V2, Cassandra 2.1+ falls under protocol V3.
* `cassandra.connecttimeoutmillis`
* `cassandra.readtimeoutmillis`
* Defaults for connect and read timeouts can be found here: https://docs.datastax.com/en/drivers/java/2.0/com/datastax/driver/core/SocketOptions.html.
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2012 - 2016 YCSB contributors. All rights reserved.
<!--
Copyright (c) 2012-2016 YCSB contributors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License"); you
may not use this file except in compliance with the License. You
......@@ -16,36 +17,65 @@ permissions and limitations under the License. See accompanying
LICENSE file.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.yahoo.ycsb</groupId>
<artifactId>binding-parent</artifactId>
<version>0.11.0-SNAPSHOT</version>
<relativePath>../binding-parent</relativePath>
<groupId>com.yahoo.ycsb</groupId>
<artifactId>binding-parent</artifactId>
<version>0.11.0-SNAPSHOT</version>
<relativePath>../binding-parent</relativePath>
</parent>
<artifactId>cassandra-binding</artifactId>
<name>Cassandra DB Binding</name>
<name>Cassandra 2.1+ DB Binding</name>
<packaging>jar</packaging>
<properties>
<!-- Skip tests by default. will be activated by jdk8 profile -->
<skipTests>true</skipTests>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
<version>${cassandra.version}</version>
</dependency>
<!-- CQL driver -->
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>${cassandra.cql.version}</version>
</dependency>
<dependency>
<groupId>com.yahoo.ycsb</groupId>
<artifactId>core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<!-- CQL driver -->
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>${cassandra.cql.version}</version>
</dependency>
<dependency>
<groupId>com.yahoo.ycsb</groupId>
<artifactId>core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.cassandraunit</groupId>
<artifactId>cassandra-unit</artifactId>
<version>3.0.0.1</version>
<classifier>shaded</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
<profiles>
<!-- Cassandra 2.2+ requires JDK8 to run, so none of our tests
will work unless we're using jdk8.
-->
<profile>
<id>jdk8-tests</id>
<activation>
<jdk>1.8</jdk>
</activation>
<properties>
<skipTests>false</skipTests>
</properties>
</profile>
</profiles>
</project>
/**
* Copyright (c) 2013 Yahoo! Inc. All rights reserved.
* Copyright (c) 2013-2015 YCSB contributors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
......@@ -17,24 +17,37 @@
*/
package com.yahoo.ycsb.db;
import com.datastax.driver.core.*;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ColumnDefinitions;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.HostDistance;
import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.querybuilder.Insert;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Select;
import com.yahoo.ycsb.*;
import java.nio.ByteBuffer;
import com.yahoo.ycsb.ByteArrayByteIterator;
import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.DB;
import com.yahoo.ycsb.DBException;
import com.yahoo.ycsb.Status;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.HashMap;
import java.util.Vector;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Tested with Cassandra 2.0, CQL client for YCSB framework
* Cassandra 2.x CQL client.
*
* See {@code cassandra2} for a version compatible with Cassandra 2.1+. See
* {@code cassandra2/README.md} for details.
* See {@code cassandra2/README.md} for details.
*
* @author cmatser
*/
......@@ -54,6 +67,7 @@ public class CassandraCQLClient extends DB {
public static final String HOSTS_PROPERTY = "hosts";
public static final String PORT_PROPERTY = "port";
public static final String PORT_PROPERTY_DEFAULT = "9042";
public static final String READ_CONSISTENCY_LEVEL_PROPERTY =
"cassandra.readconsistencylevel";
......@@ -62,6 +76,15 @@ public class CassandraCQLClient extends DB {
"cassandra.writeconsistencylevel";
public static final String WRITE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT = "ONE";
public static final String MAX_CONNECTIONS_PROPERTY =
"cassandra.maxconnections";
public static final String CORE_CONNECTIONS_PROPERTY =
"cassandra.coreconnections";
public static final String CONNECT_TIMEOUT_MILLIS_PROPERTY =
"cassandra.connecttimeoutmillis";
public static final String READ_TIMEOUT_MILLIS_PROPERTY =
"cassandra.readtimeoutmillis";
/**
* Count the number of times initialized to teardown on the last
* {@link #cleanup()}.
......@@ -101,12 +124,7 @@ public class CassandraCQLClient extends DB {
HOSTS_PROPERTY));
}
String[] hosts = host.split(",");
String port = getProperties().getProperty("port", "9042");
if (port == null) {
throw new DBException(String.format(
"Required property \"%s\" missing for CassandraCQLClient",
PORT_PROPERTY));
}
String port = getProperties().getProperty(PORT_PROPERTY, PORT_PROPERTY_DEFAULT);
String username = getProperties().getProperty(USERNAME_PROPERTY);
String password = getProperties().getProperty(PASSWORD_PROPERTY);
......@@ -121,7 +139,6 @@ public class CassandraCQLClient extends DB {
getProperties().getProperty(WRITE_CONSISTENCY_LEVEL_PROPERTY,
WRITE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT));
// public void connect(String node) {}
if ((username != null) && !username.isEmpty()) {
cluster = Cluster.builder().withCredentials(username, password)
.withPort(Integer.valueOf(port)).addContactPoints(hosts).build();
......@@ -130,21 +147,38 @@ public class CassandraCQLClient extends DB {
.addContactPoints(hosts).build();
}
// Update number of connections based on threads
int threadcount =
Integer.parseInt(getProperties().getProperty("threadcount", "1"));
cluster.getConfiguration().getPoolingOptions()
.setMaxConnectionsPerHost(HostDistance.LOCAL, threadcount);
String maxConnections = getProperties().getProperty(
MAX_CONNECTIONS_PROPERTY);
if (maxConnections != null) {
cluster.getConfiguration().getPoolingOptions()
.setMaxConnectionsPerHost(HostDistance.LOCAL,
Integer.valueOf(maxConnections));
}
String coreConnections = getProperties().getProperty(
CORE_CONNECTIONS_PROPERTY);
if (coreConnections != null) {
cluster.getConfiguration().getPoolingOptions()
.setCoreConnectionsPerHost(HostDistance.LOCAL,
Integer.valueOf(coreConnections));
}
// Set connection timeout 3min (default is 5s)
cluster.getConfiguration().getSocketOptions()
.setConnectTimeoutMillis(3 * 60 * 1000);
// Set read (execute) timeout 3min (default is 12s)
cluster.getConfiguration().getSocketOptions()
.setReadTimeoutMillis(3 * 60 * 1000);
String connectTimoutMillis = getProperties().getProperty(
CONNECT_TIMEOUT_MILLIS_PROPERTY);
if (connectTimoutMillis != null) {
cluster.getConfiguration().getSocketOptions()
.setConnectTimeoutMillis(Integer.valueOf(connectTimoutMillis));
}
String readTimoutMillis = getProperties().getProperty(
READ_TIMEOUT_MILLIS_PROPERTY);
if (readTimoutMillis != null) {
cluster.getConfiguration().getSocketOptions()
.setReadTimeoutMillis(Integer.valueOf(readTimoutMillis));
}
Metadata metadata = cluster.getMetadata();
System.out.printf("Connected to cluster: %s\n",
System.err.printf("Connected to cluster: %s\n",
metadata.getClusterName());
for (Host discoveredHost : metadata.getAllHosts()) {
......@@ -167,8 +201,19 @@ public class CassandraCQLClient extends DB {
*/
@Override
public void cleanup() throws DBException {
if (INIT_COUNT.decrementAndGet() <= 0) {
cluster.shutdown();
synchronized (INIT_COUNT) {
final int curInitCount = INIT_COUNT.decrementAndGet();
if (curInitCount <= 0) {
session.close();
cluster.close();
cluster = null;
session = null;
}
if (curInitCount < 0) {
// This should never happen.
throw new DBException(
String.format("initCount is negative: %d", curInitCount));
}
}
}
......@@ -189,7 +234,6 @@ public class CassandraCQLClient extends DB {
@Override
public Status read(String table, String key, Set<String> fields,
HashMap<String, ByteIterator> result) {
try {
Statement stmt;
Select.Builder selectBuilder;
......@@ -213,20 +257,21 @@ public class CassandraCQLClient extends DB {
ResultSet rs = session.execute(stmt);
if (rs.isExhausted()) {
return Status.NOT_FOUND;
}
// Should be only 1 row
if (!rs.isExhausted()) {
Row row = rs.one();
ColumnDefinitions cd = row.getColumnDefinitions();
Row row = rs.one();
ColumnDefinitions cd = row.getColumnDefinitions();
for (ColumnDefinitions.Definition def : cd) {
ByteBuffer val = row.getBytesUnsafe(def.getName());
if (val != null) {
result.put(def.getName(), new ByteArrayByteIterator(val.array()));
} else {
result.put(def.getName(), null);
}
for (ColumnDefinitions.Definition def : cd) {
ByteBuffer val = row.getBytesUnsafe(def.getName());
if (val != null) {
result.put(def.getName(), new ByteArrayByteIterator(val.array()));
} else {
result.put(def.getName(), null);
}
}
return Status.OK;
......@@ -242,7 +287,7 @@ public class CassandraCQLClient extends DB {
/**
* Perform a range scan for a set of records in the database. Each field/value
* pair from the result will be stored in a HashMap.
*
*
* Cassandra CQL uses "token" method for range scan which doesn't always yield
* intuitive results.
*
......
/**
* Copyright (c) 2010 Yahoo! Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb.db;
import com.yahoo.ycsb.ByteArrayByteIterator;
import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.DB;
import com.yahoo.ycsb.DBException;
import com.yahoo.ycsb.Status;
import com.yahoo.ycsb.StringByteIterator;
import com.yahoo.ycsb.Utils;
import org.apache.cassandra.thrift.AuthenticationRequest;
import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.ColumnPath;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.KeyRange;
import org.apache.cassandra.thrift.KeySlice;
import org.apache.cassandra.thrift.Mutation;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.Vector;
//XXXX if we do replication, fix the consistency levels
/**
* Cassandra 1.0.6 client for YCSB framework.
*/
public class CassandraClient10 extends DB {
public static final int OK = 0;
public static final int ERROR = -1;
public static final ByteBuffer EMPTY_BYTE_BUFFER =
ByteBuffer.wrap(new byte[0]);
public static final String CONNECTION_RETRY_PROPERTY =
"cassandra.connectionretries";
public static final String CONNECTION_RETRY_PROPERTY_DEFAULT = "300";
public static final String OPERATION_RETRY_PROPERTY =
"cassandra.operationretries";
public static final String OPERATION_RETRY_PROPERTY_DEFAULT = "300";
public static final String USERNAME_PROPERTY = "cassandra.username";
public static final String PASSWORD_PROPERTY = "cassandra.password";
public static final String COLUMN_FAMILY_PROPERTY = "cassandra.columnfamily";
public static final String COLUMN_FAMILY_PROPERTY_DEFAULT = "data";
public static final String READ_CONSISTENCY_LEVEL_PROPERTY =
"cassandra.readconsistencylevel";
public static final String READ_CONSISTENCY_LEVEL_PROPERTY_DEFAULT = "ONE";
public static final String WRITE_CONSISTENCY_LEVEL_PROPERTY =
"cassandra.writeconsistencylevel";
public static final String WRITE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT = "ONE";
public static final String SCAN_CONSISTENCY_LEVEL_PROPERTY =
"cassandra.scanconsistencylevel";
public static final String SCAN_CONSISTENCY_LEVEL_PROPERTY_DEFAULT = "ONE";
public static final String DELETE_CONSISTENCY_LEVEL_PROPERTY =
"cassandra.deleteconsistencylevel";
public static final String DELETE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT = "ONE";
private int connectionRetries;
private int operationRetries;
private String columnFamily;
private TTransport tr;
private Cassandra.Client client;
private boolean debug = false;
private String tableName = "";
private Exception errorexception = null;
private List<Mutation> mutations = new ArrayList<Mutation>();
private Map<String, List<Mutation>> mutationMap =
new HashMap<String, List<Mutation>>();
private Map<ByteBuffer, Map<String, List<Mutation>>> record =
new HashMap<ByteBuffer, Map<String, List<Mutation>>>();
private ColumnParent parent;
private ConsistencyLevel readConsistencyLevel = ConsistencyLevel.ONE;
private ConsistencyLevel writeConsistencyLevel = ConsistencyLevel.ONE;
private ConsistencyLevel scanConsistencyLevel = ConsistencyLevel.ONE;
private ConsistencyLevel deleteConsistencyLevel = ConsistencyLevel.ONE;
/**
* Initialize any state for this DB. Called once per DB instance; there is one
* DB instance per client thread.
*/
public void init() throws DBException {
String hosts = getProperties().getProperty("hosts");
if (hosts == null) {
throw new DBException(
"Required property \"hosts\" missing for CassandraClient");
}
columnFamily = getProperties().getProperty(COLUMN_FAMILY_PROPERTY,
COLUMN_FAMILY_PROPERTY_DEFAULT);
parent = new ColumnParent(columnFamily);
connectionRetries =
Integer.parseInt(getProperties().getProperty(CONNECTION_RETRY_PROPERTY,
CONNECTION_RETRY_PROPERTY_DEFAULT));
operationRetries =
Integer.parseInt(getProperties().getProperty(OPERATION_RETRY_PROPERTY,
OPERATION_RETRY_PROPERTY_DEFAULT));
String username = getProperties().getProperty(USERNAME_PROPERTY);
String password = getProperties().getProperty(PASSWORD_PROPERTY);
readConsistencyLevel = ConsistencyLevel
.valueOf(getProperties().getProperty(READ_CONSISTENCY_LEVEL_PROPERTY,
READ_CONSISTENCY_LEVEL_PROPERTY_DEFAULT));
writeConsistencyLevel = ConsistencyLevel
.valueOf(getProperties().getProperty(WRITE_CONSISTENCY_LEVEL_PROPERTY,
WRITE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT));
scanConsistencyLevel = ConsistencyLevel
.valueOf(getProperties().getProperty(SCAN_CONSISTENCY_LEVEL_PROPERTY,
SCAN_CONSISTENCY_LEVEL_PROPERTY_DEFAULT));
deleteConsistencyLevel = ConsistencyLevel
.valueOf(getProperties().getProperty(DELETE_CONSISTENCY_LEVEL_PROPERTY,
DELETE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT));
debug = Boolean.parseBoolean(getProperties().getProperty("debug", "false"));
String[] allhosts = hosts.split(",");
String myhost = allhosts[Utils.random().nextInt(allhosts.length)];
Exception connectexception = null;
for (int retry = 0; retry < connectionRetries; retry++) {
tr = new TFramedTransport(new TSocket(myhost,
Integer.parseInt(getProperties().getProperty("port", "9160"))));
TProtocol proto = new TBinaryProtocol(tr);
client = new Cassandra.Client(proto);
try {
tr.open();
connectexception = null;
break;
} catch (Exception e) {
connectexception = e;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
if (connectexception != null) {
System.err.println("Unable to connect to " + myhost + " after "
+ connectionRetries + " tries");
throw new DBException(connectexception);
}
if (username != null && password != null) {
Map<String, String> cred = new HashMap<String, String>();
cred.put("username", username);
cred.put("password", password);
AuthenticationRequest req = new AuthenticationRequest(cred);
try {
client.login(req);
} catch (Exception e) {
throw new DBException(e);
}
}
}
/**
* Cleanup any state for this DB. Called once per DB instance; there is one DB
* instance per client thread.
*/
public void cleanup() throws DBException {
tr.close();
}
/**
* Read a record from the database. Each field/value pair from the result will
* be stored in a HashMap.
*
* @param table
* The name of the table
* @param key
* The record key of the record to read.
* @param fields
* The list of fields to read, or null for all of them
* @param result
* A HashMap of field/value pairs for the result
* @return Zero on success, a non-zero error code on error
*/
public Status read(String table, String key, Set<String> fields,
HashMap<String, ByteIterator> result) {
if (!tableName.equals(table)) {
try {
client.set_keyspace(table);
tableName = table;
} catch (Exception e) {
e.printStackTrace();
e.printStackTrace(System.out);
return Status.ERROR;
}
}
for (int i = 0; i < operationRetries; i++) {
try {
SlicePredicate predicate;
if (fields == null) {
predicate = new SlicePredicate().setSlice_range(new SliceRange(
EMPTY_BYTE_BUFFER, EMPTY_BYTE_BUFFER, false, 1000000));
} else {
ArrayList<ByteBuffer> fieldlist =
new ArrayList<ByteBuffer>(fields.size());
for (String s : fields) {
fieldlist.add(ByteBuffer.wrap(s.getBytes("UTF-8")));
}
predicate = new SlicePredicate().setColumn_names(fieldlist);
}
List<ColumnOrSuperColumn> results =
client.get_slice(ByteBuffer.wrap(key.getBytes("UTF-8")), parent,
predicate, readConsistencyLevel);
if (debug) {
System.out.print("Reading key: " + key);
}
Column column;
String name;
ByteIterator value;
for (ColumnOrSuperColumn oneresult : results) {
column = oneresult.column;
name = new String(column.name.array(),
column.name.position() + column.name.arrayOffset(),
column.name.remaining());
value = new ByteArrayByteIterator(column.value.array(),
column.value.position() + column.value.arrayOffset(),
column.value.remaining());
result.put(name, value);
if (debug) {
System.out.print("(" + name + "=" + value + ")");
}
}
if (debug) {
System.out.println();
System.out
.println("ConsistencyLevel=" + readConsistencyLevel.toString());
}
return Status.OK;
} catch (Exception e) {
errorexception = e;
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
errorexception.printStackTrace();
errorexception.printStackTrace(System.out);
return Status.ERROR;
}
/**
* Perform a range scan for a set of records in the database. Each field/value
* pair from the result will be stored in a HashMap.
*
* @param table
* The name of the table
* @param startkey
* The record key of the first record to read.
* @param recordcount
* The number of records to read
* @param fields
* The list of fields to read, or null for all of them
* @param result
* A Vector of HashMaps, where each HashMap is a set field/value
* pairs for one record
* @return Zero on success, a non-zero error code on error
*/
public Status scan(String table, String startkey, int recordcount,
Set<String> fields, Vector<HashMap<String, ByteIterator>> result) {
if (!tableName.equals(table)) {
try {
client.set_keyspace(table);
tableName = table;
} catch (Exception e) {
e.printStackTrace();
e.printStackTrace(System.out);
return Status.ERROR;
}
}
for (int i = 0; i < operationRetries; i++) {
try {
SlicePredicate predicate;
if (fields == null) {
predicate = new SlicePredicate().setSlice_range(new SliceRange(
EMPTY_BYTE_BUFFER, EMPTY_BYTE_BUFFER, false, 1000000));
} else {
ArrayList<ByteBuffer> fieldlist =
new ArrayList<ByteBuffer>(fields.size());
for (String s : fields) {
fieldlist.add(ByteBuffer.wrap(s.getBytes("UTF-8")));
}
predicate = new SlicePredicate().setColumn_names(fieldlist);
}
KeyRange kr = new KeyRange().setStart_key(startkey.getBytes("UTF-8"))
.setEnd_key(new byte[] {}).setCount(recordcount);
List<KeySlice> results = client.get_range_slices(parent, predicate, kr,
scanConsistencyLevel);
if (debug) {
System.out.println("Scanning startkey: " + startkey);
}
HashMap<String, ByteIterator> tuple;
for (KeySlice oneresult : results) {
tuple = new HashMap<String, ByteIterator>();
Column column;
String name;
ByteIterator value;
for (ColumnOrSuperColumn onecol : oneresult.columns) {
column = onecol.column;
name = new String(column.name.array(),
column.name.position() + column.name.arrayOffset(),
column.name.remaining());
value = new ByteArrayByteIterator(column.value.array(),
column.value.position() + column.value.arrayOffset(),
column.value.remaining());
tuple.put(name, value);
if (debug) {
System.out.print("(" + name + "=" + value + ")");
}
}
result.add(tuple);
if (debug) {
System.out.println();
System.out
.println("ConsistencyLevel=" + scanConsistencyLevel.toString());
}
}
return Status.OK;
} catch (Exception e) {
errorexception = e;
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
errorexception.printStackTrace();
errorexception.printStackTrace(System.out);
return Status.ERROR;
}
/**
* Update a record in the database. Any field/value pairs in the specified
* values HashMap will be written into the record with the specified record
* key, overwriting any existing values with the same field name.
*
* @param table
* The name of the table
* @param key
* The record key of the record to write.
* @param values
* A HashMap of field/value pairs to update in the record
* @return Zero on success, a non-zero error code on error
*/
public Status update(String table, String key,
HashMap<String, ByteIterator> values) {
return insert(table, key, values);
}
/**
* Insert a record in the database. Any field/value pairs in the specified
* values HashMap will be written into the record with the specified record
* key.
*
* @param table
* The name of the table
* @param key
* The record key of the record to insert.
* @param values
* A HashMap of field/value pairs to insert in the record
* @return Zero on success, a non-zero error code on error
*/
public Status insert(String table, String key,
HashMap<String, ByteIterator> values) {
if (!tableName.equals(table)) {
try {
client.set_keyspace(table);
tableName = table;
} catch (Exception e) {
e.printStackTrace();
e.printStackTrace(System.out);
return Status.ERROR;
}
}
for (int i = 0; i < operationRetries; i++) {
mutations.clear();
mutationMap.clear();
record.clear();
if (debug) {
System.out.println("Inserting key: " + key);
}
try {
ByteBuffer wrappedKey = ByteBuffer.wrap(key.getBytes("UTF-8"));
Column col;
ColumnOrSuperColumn column;
for (Map.Entry<String, ByteIterator> entry : values.entrySet()) {
col = new Column();
col.setName(ByteBuffer.wrap(entry.getKey().getBytes("UTF-8")));
col.setValue(ByteBuffer.wrap(entry.getValue().toArray()));
col.setTimestamp(System.currentTimeMillis());
column = new ColumnOrSuperColumn();
column.setColumn(col);
mutations.add(new Mutation().setColumn_or_supercolumn(column));
}
mutationMap.put(columnFamily, mutations);
record.put(wrappedKey, mutationMap);
client.batch_mutate(record, writeConsistencyLevel);
if (debug) {
System.out
.println("ConsistencyLevel=" + writeConsistencyLevel.toString());
}
return Status.OK;
} catch (Exception e) {
errorexception = e;
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
errorexception.printStackTrace();
errorexception.printStackTrace(System.out);
return Status.ERROR;
}
/**
* Delete a record from the database.
*
* @param table
* The name of the table
* @param key
* The record key of the record to delete.
* @return Zero on success, a non-zero error code on error
*/
public Status delete(String table, String key) {
if (!tableName.equals(table)) {
try {
client.set_keyspace(table);
tableName = table;
} catch (Exception e) {
e.printStackTrace();
e.printStackTrace(System.out);
return Status.ERROR;
}
}
for (int i = 0; i < operationRetries; i++) {
try {
client.remove(ByteBuffer.wrap(key.getBytes("UTF-8")),
new ColumnPath(columnFamily), System.currentTimeMillis(),
deleteConsistencyLevel);
if (debug) {
System.out.println("Delete key: " + key);
System.out
.println("ConsistencyLevel=" + deleteConsistencyLevel.toString());
}
return Status.OK;
} catch (Exception e) {
errorexception = e;
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
errorexception.printStackTrace();
errorexception.printStackTrace(System.out);
return Status.ERROR;
}
public static void main(String[] args) {
CassandraClient10 cli = new CassandraClient10();
Properties props = new Properties();
props.setProperty("hosts", args[0]);
cli.setProperties(props);
try {
cli.init();
} catch (Exception e) {
e.printStackTrace();
System.exit(0);
}
HashMap<String, ByteIterator> vals = new HashMap<String, ByteIterator>();
vals.put("age", new StringByteIterator("57"));
vals.put("middlename", new StringByteIterator("bradley"));
vals.put("favoritecolor", new StringByteIterator("blue"));
Status res = cli.insert("usertable", "BrianFrankCooper", vals);
System.out.println("Result of insert: " + res);
HashMap<String, ByteIterator> result = new HashMap<String, ByteIterator>();
HashSet<String> fields = new HashSet<String>();
fields.add("middlename");
fields.add("age");
fields.add("favoritecolor");
res = cli.read("usertable", "BrianFrankCooper", null, result);
System.out.println("Result of read: " + res);
for (Map.Entry<String, ByteIterator> entry : result.entrySet()) {
System.out.println("[" + entry.getKey() + "]=[" + entry.getValue() + "]");
}
res = cli.delete("usertable", "BrianFrankCooper");
System.out.println("Result of delete: " + res);
}
}
/**
* Copyright (c) 2010 Yahoo! Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb.db;
import com.yahoo.ycsb.ByteArrayByteIterator;
import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.DB;
import com.yahoo.ycsb.DBException;
import com.yahoo.ycsb.Status;
import com.yahoo.ycsb.StringByteIterator;
import com.yahoo.ycsb.Utils;
import org.apache.cassandra.thrift.AuthenticationRequest;
import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.ColumnPath;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.KeyRange;
import org.apache.cassandra.thrift.KeySlice;
import org.apache.cassandra.thrift.Mutation;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.Vector;
//XXXX if we do replication, fix the consistency levels
/**
* Cassandra 0.7 client for YCSB framework.
*/
public class CassandraClient7 extends DB {
public static final ByteBuffer EMPTY_BYTE_BUFFER =
ByteBuffer.wrap(new byte[0]);
public static final String CONNECTION_RETRY_PROPERTY =
"cassandra.connectionretries";
public static final String CONNECTION_RETRY_PROPERTY_DEFAULT = "300";
public static final String OPERATION_RETRY_PROPERTY =
"cassandra.operationretries";
public static final String OPERATION_RETRY_PROPERTY_DEFAULT = "300";
public static final String USERNAME_PROPERTY = "cassandra.username";
public static final String PASSWORD_PROPERTY = "cassandra.password";
public static final String COLUMN_FAMILY_PROPERTY = "cassandra.columnfamily";
public static final String COLUMN_FAMILY_PROPERTY_DEFAULT = "data";
private int connectionRetries;
private int operationRetries;
private String columnFamily;
private TTransport tr;
private Cassandra.Client client;
private boolean debug = false;
private String tableName = "";
private Exception errorexception = null;
private List<Mutation> mutations = new ArrayList<Mutation>();
private Map<String, List<Mutation>> mutationMap =
new HashMap<String, List<Mutation>>();
private Map<ByteBuffer, Map<String, List<Mutation>>> record =
new HashMap<ByteBuffer, Map<String, List<Mutation>>>();
private ColumnParent parent;
/**
* Initialize any state for this DB. Called once per DB instance; there is one
* DB instance per client thread.
*/
public void init() throws DBException {
String hosts = getProperties().getProperty("hosts");
if (hosts == null) {
throw new DBException(
"Required property \"hosts\" missing for CassandraClient");
}
columnFamily = getProperties().getProperty(COLUMN_FAMILY_PROPERTY,
COLUMN_FAMILY_PROPERTY_DEFAULT);
parent = new ColumnParent(columnFamily);
connectionRetries =
Integer.parseInt(getProperties().getProperty(CONNECTION_RETRY_PROPERTY,
CONNECTION_RETRY_PROPERTY_DEFAULT));
operationRetries =
Integer.parseInt(getProperties().getProperty(OPERATION_RETRY_PROPERTY,
OPERATION_RETRY_PROPERTY_DEFAULT));
String username = getProperties().getProperty(USERNAME_PROPERTY);
String password = getProperties().getProperty(PASSWORD_PROPERTY);
debug = Boolean.parseBoolean(getProperties().getProperty("debug", "false"));
String[] allhosts = hosts.split(",");
String myhost = allhosts[Utils.random().nextInt(allhosts.length)];
Exception connectexception = null;
for (int retry = 0; retry < connectionRetries; retry++) {
tr = new TFramedTransport(new TSocket(myhost, 9160));
TProtocol proto = new TBinaryProtocol(tr);
client = new Cassandra.Client(proto);
try {
tr.open();
connectexception = null;
break;
} catch (Exception e) {
connectexception = e;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
if (connectexception != null) {
System.err.println("Unable to connect to " + myhost + " after "
+ connectionRetries + " tries");
System.out.println("Unable to connect to " + myhost + " after "
+ connectionRetries + " tries");
throw new DBException(connectexception);
}
if (username != null && password != null) {
Map<String, String> cred = new HashMap<String, String>();
cred.put("username", username);
cred.put("password", password);
AuthenticationRequest req = new AuthenticationRequest(cred);
try {
client.login(req);
} catch (Exception e) {
throw new DBException(e);
}
}
}
/**
* Cleanup any state for this DB. Called once per DB instance; there is one DB
* instance per client thread.
*/
public void cleanup() throws DBException {
tr.close();
}
/**
* Read a record from the database. Each field/value pair from the result will
* be stored in a HashMap.
*
* @param table
* The name of the table
* @param key
* The record key of the record to read.
* @param fields
* The list of fields to read, or null for all of them
* @param result
* A HashMap of field/value pairs for the result
* @return Zero on success, a non-zero error code on error
*/
public Status read(String table, String key, Set<String> fields,
HashMap<String, ByteIterator> result) {
if (!tableName.equals(table)) {
try {
client.set_keyspace(table);
tableName = table;
} catch (Exception e) {
e.printStackTrace();
e.printStackTrace(System.out);
return Status.ERROR;
}
}
for (int i = 0; i < operationRetries; i++) {
try {
SlicePredicate predicate;
if (fields == null) {
SliceRange range = new SliceRange(EMPTY_BYTE_BUFFER,
EMPTY_BYTE_BUFFER, false, 1000000);
predicate = new SlicePredicate().setSlice_range(range);
} else {
ArrayList<ByteBuffer> fieldlist =
new ArrayList<ByteBuffer>(fields.size());
for (String s : fields) {
fieldlist.add(ByteBuffer.wrap(s.getBytes("UTF-8")));
}
predicate = new SlicePredicate().setColumn_names(fieldlist);
}
List<ColumnOrSuperColumn> results =
client.get_slice(ByteBuffer.wrap(key.getBytes("UTF-8")), parent,
predicate, ConsistencyLevel.ONE);
if (debug) {
System.out.print("Reading key: " + key);
}
Column column;
String name;
ByteIterator value;
for (ColumnOrSuperColumn oneresult : results) {
column = oneresult.column;
name = new String(column.name.array(),
column.name.position() + column.name.arrayOffset(),
column.name.remaining());
value = new ByteArrayByteIterator(column.value.array(),
column.value.position() + column.value.arrayOffset(),
column.value.remaining());
result.put(name, value);
if (debug) {
System.out.print("(" + name + "=" + value + ")");
}
}
if (debug) {
System.out.println();
}
return Status.OK;
} catch (Exception e) {
errorexception = e;
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
errorexception.printStackTrace();
errorexception.printStackTrace(System.out);
return Status.ERROR;
}
/**
* Perform a range scan for a set of records in the database. Each field/value
* pair from the result will be stored in a HashMap.
*
* @param table
* The name of the table
* @param startkey
* The record key of the first record to read.
* @param recordcount
* The number of records to read
* @param fields
* The list of fields to read, or null for all of them
* @param result
* A Vector of HashMaps, where each HashMap is a set field/value
* pairs for one record
* @return Zero on success, a non-zero error code on error
*/
public Status scan(String table, String startkey, int recordcount,
Set<String> fields, Vector<HashMap<String, ByteIterator>> result) {
if (!tableName.equals(table)) {
try {
client.set_keyspace(table);
tableName = table;
} catch (Exception e) {
e.printStackTrace();
e.printStackTrace(System.out);
return Status.ERROR;
}
}
for (int i = 0; i < operationRetries; i++) {
try {
SlicePredicate predicate;
if (fields == null) {
SliceRange range = new SliceRange(EMPTY_BYTE_BUFFER,
EMPTY_BYTE_BUFFER, false, 1000000);
predicate = new SlicePredicate().setSlice_range(range);
} else {
ArrayList<ByteBuffer> fieldlist =
new ArrayList<ByteBuffer>(fields.size());
for (String s : fields) {
fieldlist.add(ByteBuffer.wrap(s.getBytes("UTF-8")));
}
predicate = new SlicePredicate().setColumn_names(fieldlist);
}
KeyRange kr = new KeyRange().setStart_key(startkey.getBytes("UTF-8"))
.setEnd_key(new byte[] {}).setCount(recordcount);
List<KeySlice> results = client.get_range_slices(parent, predicate, kr,
ConsistencyLevel.ONE);
if (debug) {
System.out.println("Scanning startkey: " + startkey);
}
HashMap<String, ByteIterator> tuple;
for (KeySlice oneresult : results) {
tuple = new HashMap<String, ByteIterator>();
Column column;
String name;
ByteIterator value;
for (ColumnOrSuperColumn onecol : oneresult.columns) {
column = onecol.column;
name = new String(column.name.array(),
column.name.position() + column.name.arrayOffset(),
column.name.remaining());
value = new ByteArrayByteIterator(column.value.array(),
column.value.position() + column.value.arrayOffset(),
column.value.remaining());
tuple.put(name, value);
if (debug) {
System.out.print("(" + name + "=" + value + ")");
}
}
result.add(tuple);
if (debug) {
System.out.println();
}
}
return Status.OK;
} catch (Exception e) {
errorexception = e;
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
errorexception.printStackTrace();
errorexception.printStackTrace(System.out);
return Status.ERROR;
}
/**
* Update a record in the database. Any field/value pairs in the specified
* values HashMap will be written into the record with the specified record
* key, overwriting any existing values with the same field name.
*
* @param table
* The name of the table
* @param key
* The record key of the record to write.
* @param values
* A HashMap of field/value pairs to update in the record
* @return Zero on success, a non-zero error code on error
*/
public Status update(String table, String key,
HashMap<String, ByteIterator> values) {
return insert(table, key, values);
}
/**
* Insert a record in the database. Any field/value pairs in the specified
* values HashMap will be written into the record with the specified record
* key.
*
* @param table
* The name of the table
* @param key
* The record key of the record to insert.
* @param values
* A HashMap of field/value pairs to insert in the record
* @return Zero on success, a non-zero error code on error
*/
public Status insert(String table, String key,
HashMap<String, ByteIterator> values) {
if (!tableName.equals(table)) {
try {
client.set_keyspace(table);
tableName = table;
} catch (Exception e) {
e.printStackTrace();
e.printStackTrace(System.out);
return Status.ERROR;
}
}
for (int i = 0; i < operationRetries; i++) {
mutations.clear();
mutationMap.clear();
record.clear();
if (debug) {
System.out.println("Inserting key: " + key);
}
try {
ByteBuffer wrappedKey = ByteBuffer.wrap(key.getBytes("UTF-8"));
Column col;
ColumnOrSuperColumn column;
for (Map.Entry<String, ByteIterator> entry : values.entrySet()) {
col = new Column();
col.setName(ByteBuffer.wrap(entry.getKey().getBytes("UTF-8")));
col.setValue(ByteBuffer.wrap(entry.getValue().toArray()));
col.setTimestamp(System.currentTimeMillis());
column = new ColumnOrSuperColumn();
column.setColumn(col);
mutations.add(new Mutation().setColumn_or_supercolumn(column));
}
mutationMap.put(columnFamily, mutations);
record.put(wrappedKey, mutationMap);
client.batch_mutate(record, ConsistencyLevel.ONE);
return Status.OK;
} catch (Exception e) {
errorexception = e;
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
errorexception.printStackTrace();
errorexception.printStackTrace(System.out);
return Status.ERROR;
}
/**
* Delete a record from the database.
*
* @param table
* The name of the table
* @param key
* The record key of the record to delete.
* @return Zero on success, a non-zero error code on error
*/
public Status delete(String table, String key) {
if (!tableName.equals(table)) {
try {
client.set_keyspace(table);
tableName = table;
} catch (Exception e) {
e.printStackTrace();
e.printStackTrace(System.out);
return Status.ERROR;
}
}
for (int i = 0; i < operationRetries; i++) {
try {
client.remove(ByteBuffer.wrap(key.getBytes("UTF-8")),
new ColumnPath(columnFamily), System.currentTimeMillis(),
ConsistencyLevel.ONE);
if (debug) {
System.out.println("Delete key: " + key);
}
return Status.OK;
} catch (Exception e) {
errorexception = e;
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
errorexception.printStackTrace();
errorexception.printStackTrace(System.out);
return Status.ERROR;
}
public static void main(String[] args) {
CassandraClient7 cli = new CassandraClient7();
Properties props = new Properties();
props.setProperty("hosts", args[0]);
cli.setProperties(props);
try {
cli.init();
} catch (Exception e) {
e.printStackTrace();
System.exit(0);
}
HashMap<String, ByteIterator> vals = new HashMap<String, ByteIterator>();
vals.put("age", new StringByteIterator("57"));
vals.put("middlename", new StringByteIterator("bradley"));
vals.put("favoritecolor", new StringByteIterator("blue"));
Status res = cli.insert("usertable", "BrianFrankCooper", vals);
System.out.println("Result of insert: " + res.getName());
HashMap<String, ByteIterator> result = new HashMap<String, ByteIterator>();
HashSet<String> fields = new HashSet<String>();
fields.add("middlename");
fields.add("age");
fields.add("favoritecolor");
res = cli.read("usertable", "BrianFrankCooper", null, result);
System.out.println("Result of read: " + res.getName());
for (Map.Entry<String, ByteIterator> entry : result.entrySet()) {
System.out.println("[" + entry.getKey() + "]=[" + entry.getValue() + "]");
}
res = cli.delete("usertable", "BrianFrankCooper");
System.out.println("Result of delete: " + res.getName());
}
}
/**
* Copyright (c) 2010 Yahoo! Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb.db;
import com.yahoo.ycsb.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Vector;
import java.util.Properties;
import java.nio.ByteBuffer;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.cassandra.thrift.*;
//XXXX if we do replication, fix the consistency levels
/**
* Cassandra 0.8 client for YCSB framework.
*/
public class CassandraClient8 extends DB {
public static final ByteBuffer EMPTY_BYTE_BUFFER =
ByteBuffer.wrap(new byte[0]);
public static final String CONNECTION_RETRY_PROPERTY =
"cassandra.connectionretries";
public static final String CONNECTION_RETRY_PROPERTY_DEFAULT = "300";
public static final String OPERATION_RETRY_PROPERTY =
"cassandra.operationretries";
public static final String OPERATION_RETRY_PROPERTY_DEFAULT = "300";
public static final String USERNAME_PROPERTY = "cassandra.username";
public static final String PASSWORD_PROPERTY = "cassandra.password";
public static final String COLUMN_FAMILY_PROPERTY = "cassandra.columnfamily";
public static final String COLUMN_FAMILY_PROPERTY_DEFAULT = "data";
private int connectionRetries;
private int operationRetries;
private String columnFamily;
private TTransport tr;
private Cassandra.Client client;
private boolean debug = false;
private String tableName = "";
private Exception errorexception = null;
private List<Mutation> mutations = new ArrayList<Mutation>();
private Map<String, List<Mutation>> mutationMap =
new HashMap<String, List<Mutation>>();
private Map<ByteBuffer, Map<String, List<Mutation>>> record =
new HashMap<ByteBuffer, Map<String, List<Mutation>>>();
private ColumnParent parent;
/**
* Initialize any state for this DB. Called once per DB instance; there is one
* DB instance per client thread.
*/
public void init() throws DBException {
String hosts = getProperties().getProperty("hosts");
if (hosts == null) {
throw new DBException(
"Required property \"hosts\" missing for CassandraClient");
}
columnFamily = getProperties().getProperty(COLUMN_FAMILY_PROPERTY,
COLUMN_FAMILY_PROPERTY_DEFAULT);
parent = new ColumnParent(columnFamily);
connectionRetries =
Integer.parseInt(getProperties().getProperty(CONNECTION_RETRY_PROPERTY,
CONNECTION_RETRY_PROPERTY_DEFAULT));
operationRetries =
Integer.parseInt(getProperties().getProperty(OPERATION_RETRY_PROPERTY,
OPERATION_RETRY_PROPERTY_DEFAULT));
String username = getProperties().getProperty(USERNAME_PROPERTY);
String password = getProperties().getProperty(PASSWORD_PROPERTY);
debug = Boolean.parseBoolean(getProperties().getProperty("debug", "false"));
String[] allhosts = hosts.split(",");
String myhost = allhosts[Utils.random().nextInt(allhosts.length)];
Exception connectexception = null;
for (int retry = 0; retry < connectionRetries; retry++) {
tr = new TFramedTransport(new TSocket(myhost, 9160));
TProtocol proto = new TBinaryProtocol(tr);
client = new Cassandra.Client(proto);
try {
tr.open();
connectexception = null;
break;
} catch (Exception e) {
connectexception = e;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
if (connectexception != null) {
System.err.println("Unable to connect to " + myhost + " after "
+ connectionRetries + " tries");
System.out.println("Unable to connect to " + myhost + " after "
+ connectionRetries + " tries");
throw new DBException(connectexception);
}
if (username != null && password != null) {
Map<String, String> cred = new HashMap<String, String>();
cred.put("username", username);
cred.put("password", password);
AuthenticationRequest req = new AuthenticationRequest(cred);
try {
client.login(req);
} catch (Exception e) {
throw new DBException(e);
}
}
}
/**
* Cleanup any state for this DB. Called once per DB instance; there is one DB
* instance per client thread.
*/
public void cleanup() throws DBException {
tr.close();
}
/**
* Read a record from the database. Each field/value pair from the result will
* be stored in a HashMap.
*
* @param table
* The name of the table
* @param key
* The record key of the record to read.
* @param fields
* The list of fields to read, or null for all of them
* @param result
* A HashMap of field/value pairs for the result
* @return Zero on success, a non-zero error code on error
*/
public Status read(String table, String key, Set<String> fields,
HashMap<String, ByteIterator> result) {
if (!tableName.equals(table)) {
try {
client.set_keyspace(table);
tableName = table;
} catch (Exception e) {
e.printStackTrace();
e.printStackTrace(System.out);
return Status.ERROR;
}
}
for (int i = 0; i < operationRetries; i++) {
try {
SlicePredicate predicate;
if (fields == null) {
predicate = new SlicePredicate().setSlice_range(new SliceRange(
EMPTY_BYTE_BUFFER, EMPTY_BYTE_BUFFER, false, 1000000));
} else {
ArrayList<ByteBuffer> fieldlist =
new ArrayList<ByteBuffer>(fields.size());
for (String s : fields) {
fieldlist.add(ByteBuffer.wrap(s.getBytes("UTF-8")));
}
predicate = new SlicePredicate().setColumn_names(fieldlist);
}
List<ColumnOrSuperColumn> results =
client.get_slice(ByteBuffer.wrap(key.getBytes("UTF-8")), parent,
predicate, ConsistencyLevel.ONE);
if (debug) {
System.out.print("Reading key: " + key);
}
Column column;
String name;
ByteIterator value;
for (ColumnOrSuperColumn oneresult : results) {
column = oneresult.column;
name = new String(column.name.array(),
column.name.position() + column.name.arrayOffset(),
column.name.remaining());
value = new ByteArrayByteIterator(column.value.array(),
column.value.position() + column.value.arrayOffset(),
column.value.remaining());
result.put(name, value);
if (debug) {
System.out.print("(" + name + "=" + value + ")");
}
}
if (debug) {
System.out.println();
}
return Status.OK;
} catch (Exception e) {
errorexception = e;
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
errorexception.printStackTrace();
errorexception.printStackTrace(System.out);
return Status.ERROR;
}
/**
* Perform a range scan for a set of records in the database. Each field/value
* pair from the result will be stored in a HashMap.
*
* @param table
* The name of the table
* @param startkey
* The record key of the first record to read.
* @param recordcount
* The number of records to read
* @param fields
* The list of fields to read, or null for all of them
* @param result
* A Vector of HashMaps, where each HashMap is a set field/value
* pairs for one record
* @return Zero on success, a non-zero error code on error
*/
public Status scan(String table, String startkey, int recordcount,
Set<String> fields, Vector<HashMap<String, ByteIterator>> result) {
if (!tableName.equals(table)) {
try {
client.set_keyspace(table);
tableName = table;
} catch (Exception e) {
e.printStackTrace();
e.printStackTrace(System.out);
return Status.ERROR;
}
}
for (int i = 0; i < operationRetries; i++) {
try {
SlicePredicate predicate;
if (fields == null) {
predicate = new SlicePredicate().setSlice_range(new SliceRange(
EMPTY_BYTE_BUFFER, EMPTY_BYTE_BUFFER, false, 1000000));
} else {
ArrayList<ByteBuffer> fieldlist =
new ArrayList<ByteBuffer>(fields.size());
for (String s : fields) {
fieldlist.add(ByteBuffer.wrap(s.getBytes("UTF-8")));
}
predicate = new SlicePredicate().setColumn_names(fieldlist);
}
KeyRange kr = new KeyRange().setStart_key(startkey.getBytes("UTF-8"))
.setEnd_key(new byte[] {}).setCount(recordcount);
List<KeySlice> results = client.get_range_slices(parent, predicate, kr,
ConsistencyLevel.ONE);
if (debug) {
System.out.println("Scanning startkey: " + startkey);
}
HashMap<String, ByteIterator> tuple;
for (KeySlice oneresult : results) {
tuple = new HashMap<String, ByteIterator>();
Column column;
String name;
ByteIterator value;
for (ColumnOrSuperColumn onecol : oneresult.columns) {
column = onecol.column;
name = new String(column.name.array(),
column.name.position() + column.name.arrayOffset(),
column.name.remaining());
value = new ByteArrayByteIterator(column.value.array(),
column.value.position() + column.value.arrayOffset(),
column.value.remaining());
tuple.put(name, value);
if (debug) {
System.out.print("(" + name + "=" + value + ")");
}
}
result.add(tuple);
if (debug) {
System.out.println();
}
}
return Status.OK;
} catch (Exception e) {
errorexception = e;
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
errorexception.printStackTrace();
errorexception.printStackTrace(System.out);
return Status.ERROR;
}
/**
* Update a record in the database. Any field/value pairs in the specified
* values HashMap will be written into the record with the specified record
* key, overwriting any existing values with the same field name.
*
* @param table
* The name of the table
* @param key
* The record key of the record to write.
* @param values
* A HashMap of field/value pairs to update in the record
* @return Zero on success, a non-zero error code on error
*/
public Status update(String table, String key,
HashMap<String, ByteIterator> values) {
return insert(table, key, values);
}
/**
* Insert a record in the database. Any field/value pairs in the specified
* values HashMap will be written into the record with the specified record
* key.
*
* @param table
* The name of the table
* @param key
* The record key of the record to insert.
* @param values
* A HashMap of field/value pairs to insert in the record
* @return Zero on success, a non-zero error code on error
*/
public Status insert(String table, String key,
HashMap<String, ByteIterator> values) {
if (!tableName.equals(table)) {
try {
client.set_keyspace(table);
tableName = table;
} catch (Exception e) {
e.printStackTrace();
e.printStackTrace(System.out);
return Status.ERROR;
}
}
for (int i = 0; i < operationRetries; i++) {
mutations.clear();
mutationMap.clear();
record.clear();
if (debug) {
System.out.println("Inserting key: " + key);
}
try {
ByteBuffer wrappedKey = ByteBuffer.wrap(key.getBytes("UTF-8"));
Column col;
ColumnOrSuperColumn column;
for (Map.Entry<String, ByteIterator> entry : values.entrySet()) {
col = new Column();
col.setName(ByteBuffer.wrap(entry.getKey().getBytes("UTF-8")));
col.setValue(ByteBuffer.wrap(entry.getValue().toArray()));
col.setTimestamp(System.currentTimeMillis());
column = new ColumnOrSuperColumn();
column.setColumn(col);
mutations.add(new Mutation().setColumn_or_supercolumn(column));
}
mutationMap.put(columnFamily, mutations);
record.put(wrappedKey, mutationMap);
client.batch_mutate(record, ConsistencyLevel.ONE);
return Status.OK;
} catch (Exception e) {
errorexception = e;
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
errorexception.printStackTrace();
errorexception.printStackTrace(System.out);
return Status.ERROR;
}
/**
* Delete a record from the database.
*
* @param table
* The name of the table
* @param key
* The record key of the record to delete.
* @return Zero on success, a non-zero error code on error
*/
public Status delete(String table, String key) {
if (!tableName.equals(table)) {
try {
client.set_keyspace(table);
tableName = table;
} catch (Exception e) {
e.printStackTrace();
e.printStackTrace(System.out);
return Status.ERROR;
}
}
for (int i = 0; i < operationRetries; i++) {
try {
client.remove(ByteBuffer.wrap(key.getBytes("UTF-8")),
new ColumnPath(columnFamily), System.currentTimeMillis(),
ConsistencyLevel.ONE);
if (debug) {
System.out.println("Delete key: " + key);
}
return Status.OK;
} catch (Exception e) {
errorexception = e;
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
errorexception.printStackTrace();
errorexception.printStackTrace(System.out);
return Status.ERROR;
}
public static void main(String[] args) {
CassandraClient8 cli = new CassandraClient8();
Properties props = new Properties();
props.setProperty("hosts", args[0]);
cli.setProperties(props);
try {
cli.init();
} catch (Exception e) {
e.printStackTrace();
System.exit(0);
}
HashMap<String, ByteIterator> vals = new HashMap<String, ByteIterator>();
vals.put("age", new StringByteIterator("57"));
vals.put("middlename", new StringByteIterator("bradley"));
vals.put("favoritecolor", new StringByteIterator("blue"));
Status res = cli.insert("usertable", "BrianFrankCooper", vals);
System.out.println("Result of insert: " + res);
HashMap<String, ByteIterator> result = new HashMap<String, ByteIterator>();
HashSet<String> fields = new HashSet<String>();
fields.add("middlename");
fields.add("age");
fields.add("favoritecolor");
res = cli.read("usertable", "BrianFrankCooper", null, result);
System.out.println("Result of read: " + res);
for (Map.Entry<String, ByteIterator> entry : result.entrySet()) {
System.out.println("[" + entry.getKey() + "]=[" + entry.getValue() + "]");
}
res = cli.delete("usertable", "BrianFrankCooper");
System.out.println("Result of delete: " + res);
}
}
......@@ -17,7 +17,7 @@
/**
* The YCSB binding for <a href="http://cassandra.apache.org/">Cassandra</a>
* versions 0.7, 0.8, and 1.0.X.
* 2.1+ via CQL.
*/
package com.yahoo.ycsb.db;
<!--
Copyright (c) 2015 YCSB contributors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License"); you
may not use this file except in compliance with the License. You
may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied. See the License for the specific language governing
permissions and limitations under the License. See accompanying
LICENSE file.
-->
# Apache Cassandra 2.x CQL binding
Binding for [Apache Cassandra](http://cassandra.apache.org), using the CQL API
via the [DataStax
driver](http://docs.datastax.com/en/developer/java-driver/2.1/java-driver/whatsNew2.html).
To run against the (deprecated) Cassandra Thrift API, use the `cassandra-10` binding.
## Creating a table for use with YCSB
For keyspace `ycsb`, table `usertable`:
cqlsh> create keyspace ycsb
WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor': 3 };
cqlsh> USE ycsb;
cqlsh> create table usertable (
y_id varchar primary key,
field0 varchar,
field1 varchar,
field2 varchar,
field3 varchar,
field4 varchar,
field5 varchar,
field6 varchar,
field7 varchar,
field8 varchar,
field9 varchar);
**Note that `replication_factor` and consistency levels (below) will affect performance.**
## Cassandra Configuration Parameters
- `hosts` (**required**)
- Cassandra nodes to connect to.
- No default.
* `port`
* CQL port for communicating with Cassandra cluster.
* Default is `9042`.
- `cassandra.keyspace`
Keyspace name - must match the keyspace for the table created (see above).
See http://docs.datastax.com/en/cql/3.1/cql/cql_reference/create_keyspace_r.html for details.
- Default value is `ycsb`
- `cassandra.username`
- `cassandra.password`
- Optional user name and password for authentication. See http://docs.datastax.com/en/cassandra/2.0/cassandra/security/security_config_native_authenticate_t.html for details.
* `cassandra.readconsistencylevel`
* `cassandra.writeconsistencylevel`
* Default value is `ONE`
- Consistency level for reads and writes, respectively. See the [DataStax documentation](http://docs.datastax.com/en/cassandra/2.0/cassandra/dml/dml_config_consistency_c.html) for details.
* *Note that the default setting does not provide durability in the face of node failure. Changing this setting will affect observed performance.* See also `replication_factor`, above.
* `cassandra.maxconnections`
* `cassandra.coreconnections`
* Defaults for max and core connections can be found here: https://datastax.github.io/java-driver/2.1.8/features/pooling/#pool-size. Cassandra 2.0.X falls under protocol V2, Cassandra 2.1+ falls under protocol V3.
* `cassandra.connecttimeoutmillis`
* `cassandra.readtimeoutmillis`
* Defaults for connect and read timeouts can be found here: https://docs.datastax.com/en/drivers/java/2.0/com/datastax/driver/core/SocketOptions.html.
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2012-2016 YCSB contributors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License"); you
may not use this file except in compliance with the License. You
may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied. See the License for the specific language governing
permissions and limitations under the License. See accompanying
LICENSE file.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.yahoo.ycsb</groupId>
<artifactId>binding-parent</artifactId>
<version>0.11.0-SNAPSHOT</version>
<relativePath>../binding-parent</relativePath>
</parent>
<artifactId>cassandra2-binding</artifactId>
<name>Cassandra 2.1+ DB Binding</name>
<packaging>jar</packaging>
<properties>
<!-- Skip tests by default. will be activated by jdk8 profile -->
<skipTests>true</skipTests>
</properties>
<dependencies>
<!-- CQL driver -->
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>${cassandra2.cql.version}</version>
</dependency>
<dependency>
<groupId>com.yahoo.ycsb</groupId>
<artifactId>core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.cassandraunit</groupId>
<artifactId>cassandra-unit</artifactId>
<version>3.0.0.1</version>
<classifier>shaded</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
<profiles>
<!-- Cassandra 2.2+ requires JDK8 to run, so none of our tests
will work unless we're using jdk8.
-->
<profile>
<id>jdk8-tests</id>
<activation>
<jdk>1.8</jdk>
</activation>
<properties>
<skipTests>false</skipTests>
</properties>
</profile>
</profiles>
</project>
/**
* Copyright (c) 2013-2015 YCSB contributors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License. See accompanying LICENSE file.
*
* Submitted by Chrisjan Matser on 10/11/2010.
*/
package com.yahoo.ycsb.db;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ColumnDefinitions;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.HostDistance;
import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.querybuilder.Insert;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Select;
import com.yahoo.ycsb.ByteArrayByteIterator;
import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.DB;
import com.yahoo.ycsb.DBException;
import com.yahoo.ycsb.Status;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.Vector;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Cassandra 2.x CQL client.
*
* See {@code cassandra2/README.md} for details.
*
* @author cmatser
*/
public class CassandraCQLClient extends DB {
private static Cluster cluster = null;
private static Session session = null;
private static ConsistencyLevel readConsistencyLevel = ConsistencyLevel.ONE;
private static ConsistencyLevel writeConsistencyLevel = ConsistencyLevel.ONE;
public static final String YCSB_KEY = "y_id";
public static final String KEYSPACE_PROPERTY = "cassandra.keyspace";
public static final String KEYSPACE_PROPERTY_DEFAULT = "ycsb";
public static final String USERNAME_PROPERTY = "cassandra.username";
public static final String PASSWORD_PROPERTY = "cassandra.password";
public static final String HOSTS_PROPERTY = "hosts";
public static final String PORT_PROPERTY = "port";
public static final String PORT_PROPERTY_DEFAULT = "9042";
public static final String READ_CONSISTENCY_LEVEL_PROPERTY =
"cassandra.readconsistencylevel";
public static final String READ_CONSISTENCY_LEVEL_PROPERTY_DEFAULT = "ONE";
public static final String WRITE_CONSISTENCY_LEVEL_PROPERTY =
"cassandra.writeconsistencylevel";
public static final String WRITE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT = "ONE";
public static final String MAX_CONNECTIONS_PROPERTY =
"cassandra.maxconnections";
public static final String CORE_CONNECTIONS_PROPERTY =
"cassandra.coreconnections";
public static final String CONNECT_TIMEOUT_MILLIS_PROPERTY =
"cassandra.connecttimeoutmillis";
public static final String READ_TIMEOUT_MILLIS_PROPERTY =
"cassandra.readtimeoutmillis";
/**
* Count the number of times initialized to teardown on the last
* {@link #cleanup()}.
*/
private static final AtomicInteger INIT_COUNT = new AtomicInteger(0);
private static boolean debug = false;
/**
* Initialize any state for this DB. Called once per DB instance; there is one
* DB instance per client thread.
*/
@Override
public void init() throws DBException {
// Keep track of number of calls to init (for later cleanup)
INIT_COUNT.incrementAndGet();
// Synchronized so that we only have a single
// cluster/session instance for all the threads.
synchronized (INIT_COUNT) {
// Check if the cluster has already been initialized
if (cluster != null) {
return;
}
try {
debug =
Boolean.parseBoolean(getProperties().getProperty("debug", "false"));
String host = getProperties().getProperty(HOSTS_PROPERTY);
if (host == null) {
throw new DBException(String.format(
"Required property \"%s\" missing for CassandraCQLClient",
HOSTS_PROPERTY));
}
String[] hosts = host.split(",");
String port = getProperties().getProperty(PORT_PROPERTY, PORT_PROPERTY_DEFAULT);
String username = getProperties().getProperty(USERNAME_PROPERTY);
String password = getProperties().getProperty(PASSWORD_PROPERTY);
String keyspace = getProperties().getProperty(KEYSPACE_PROPERTY,
KEYSPACE_PROPERTY_DEFAULT);
readConsistencyLevel = ConsistencyLevel.valueOf(
getProperties().getProperty(READ_CONSISTENCY_LEVEL_PROPERTY,
READ_CONSISTENCY_LEVEL_PROPERTY_DEFAULT));
writeConsistencyLevel = ConsistencyLevel.valueOf(
getProperties().getProperty(WRITE_CONSISTENCY_LEVEL_PROPERTY,
WRITE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT));
if ((username != null) && !username.isEmpty()) {
cluster = Cluster.builder().withCredentials(username, password)
.withPort(Integer.valueOf(port)).addContactPoints(hosts).build();
} else {
cluster = Cluster.builder().withPort(Integer.valueOf(port))
.addContactPoints(hosts).build();
}
String maxConnections = getProperties().getProperty(
MAX_CONNECTIONS_PROPERTY);
if (maxConnections != null) {
cluster.getConfiguration().getPoolingOptions()
.setMaxConnectionsPerHost(HostDistance.LOCAL,
Integer.valueOf(maxConnections));
}
String coreConnections = getProperties().getProperty(
CORE_CONNECTIONS_PROPERTY);
if (coreConnections != null) {
cluster.getConfiguration().getPoolingOptions()
.setCoreConnectionsPerHost(HostDistance.LOCAL,
Integer.valueOf(coreConnections));
}
String connectTimoutMillis = getProperties().getProperty(
CONNECT_TIMEOUT_MILLIS_PROPERTY);
if (connectTimoutMillis != null) {
cluster.getConfiguration().getSocketOptions()
.setConnectTimeoutMillis(Integer.valueOf(connectTimoutMillis));
}
String readTimoutMillis = getProperties().getProperty(
READ_TIMEOUT_MILLIS_PROPERTY);
if (readTimoutMillis != null) {
cluster.getConfiguration().getSocketOptions()
.setReadTimeoutMillis(Integer.valueOf(readTimoutMillis));
}
Metadata metadata = cluster.getMetadata();
System.err.printf("Connected to cluster: %s\n",
metadata.getClusterName());
for (Host discoveredHost : metadata.getAllHosts()) {
System.out.printf("Datacenter: %s; Host: %s; Rack: %s\n",
discoveredHost.getDatacenter(), discoveredHost.getAddress(),
discoveredHost.getRack());
}
session = cluster.connect(keyspace);
} catch (Exception e) {
throw new DBException(e);
}
} // synchronized
}
/**
* Cleanup any state for this DB. Called once per DB instance; there is one DB
* instance per client thread.
*/
@Override
public void cleanup() throws DBException {
synchronized (INIT_COUNT) {
final int curInitCount = INIT_COUNT.decrementAndGet();
if (curInitCount <= 0) {
session.close();
cluster.close();
cluster = null;
session = null;
}
if (curInitCount < 0) {
// This should never happen.
throw new DBException(
String.format("initCount is negative: %d", curInitCount));
}
}
}
/**
* Read a record from the database. Each field/value pair from the result will
* be stored in a HashMap.
*
* @param table
* The name of the table
* @param key
* The record key of the record to read.
* @param fields
* The list of fields to read, or null for all of them
* @param result
* A HashMap of field/value pairs for the result
* @return Zero on success, a non-zero error code on error
*/
@Override
public Status read(String table, String key, Set<String> fields,
HashMap<String, ByteIterator> result) {
try {
Statement stmt;
Select.Builder selectBuilder;
if (fields == null) {
selectBuilder = QueryBuilder.select().all();
} else {
selectBuilder = QueryBuilder.select();
for (String col : fields) {
((Select.Selection) selectBuilder).column(col);
}
}
stmt = selectBuilder.from(table).where(QueryBuilder.eq(YCSB_KEY, key))
.limit(1);
stmt.setConsistencyLevel(readConsistencyLevel);
if (debug) {
System.out.println(stmt.toString());
}
ResultSet rs = session.execute(stmt);
if (rs.isExhausted()) {
return Status.NOT_FOUND;
}
// Should be only 1 row
Row row = rs.one();
ColumnDefinitions cd = row.getColumnDefinitions();
for (ColumnDefinitions.Definition def : cd) {
ByteBuffer val = row.getBytesUnsafe(def.getName());
if (val != null) {
result.put(def.getName(), new ByteArrayByteIterator(val.array()));
} else {
result.put(def.getName(), null);
}
}
return Status.OK;
} catch (Exception e) {
e.printStackTrace();
System.out.println("Error reading key: " + key);
return Status.ERROR;
}
}
/**
* Perform a range scan for a set of records in the database. Each field/value
* pair from the result will be stored in a HashMap.
*
* Cassandra CQL uses "token" method for range scan which doesn't always yield
* intuitive results.
*
* @param table
* The name of the table
* @param startkey
* The record key of the first record to read.
* @param recordcount
* The number of records to read
* @param fields
* The list of fields to read, or null for all of them
* @param result
* A Vector of HashMaps, where each HashMap is a set field/value
* pairs for one record
* @return Zero on success, a non-zero error code on error
*/
@Override
public Status scan(String table, String startkey, int recordcount,
Set<String> fields, Vector<HashMap<String, ByteIterator>> result) {
try {
Statement stmt;
Select.Builder selectBuilder;
if (fields == null) {
selectBuilder = QueryBuilder.select().all();
} else {
selectBuilder = QueryBuilder.select();
for (String col : fields) {
((Select.Selection) selectBuilder).column(col);
}
}
stmt = selectBuilder.from(table);
// The statement builder is not setup right for tokens.
// So, we need to build it manually.
String initialStmt = stmt.toString();
StringBuilder scanStmt = new StringBuilder();
scanStmt.append(initialStmt.substring(0, initialStmt.length() - 1));
scanStmt.append(" WHERE ");
scanStmt.append(QueryBuilder.token(YCSB_KEY));
scanStmt.append(" >= ");
scanStmt.append("token('");
scanStmt.append(startkey);
scanStmt.append("')");
scanStmt.append(" LIMIT ");
scanStmt.append(recordcount);
stmt = new SimpleStatement(scanStmt.toString());
stmt.setConsistencyLevel(readConsistencyLevel);
if (debug) {
System.out.println(stmt.toString());
}
ResultSet rs = session.execute(stmt);
HashMap<String, ByteIterator> tuple;
while (!rs.isExhausted()) {
Row row = rs.one();
tuple = new HashMap<String, ByteIterator>();
ColumnDefinitions cd = row.getColumnDefinitions();
for (ColumnDefinitions.Definition def : cd) {
ByteBuffer val = row.getBytesUnsafe(def.getName());
if (val != null) {
tuple.put(def.getName(), new ByteArrayByteIterator(val.array()));
} else {
tuple.put(def.getName(), null);
}
}
result.add(tuple);
}
return Status.OK;
} catch (Exception e) {
e.printStackTrace();
System.out.println("Error scanning with startkey: " + startkey);
return Status.ERROR;
}
}
/**
* Update a record in the database. Any field/value pairs in the specified
* values HashMap will be written into the record with the specified record
* key, overwriting any existing values with the same field name.
*
* @param table
* The name of the table
* @param key
* The record key of the record to write.
* @param values
* A HashMap of field/value pairs to update in the record
* @return Zero on success, a non-zero error code on error
*/
@Override
public Status update(String table, String key,
HashMap<String, ByteIterator> values) {
// Insert and updates provide the same functionality
return insert(table, key, values);
}
/**
* Insert a record in the database. Any field/value pairs in the specified
* values HashMap will be written into the record with the specified record
* key.
*
* @param table
* The name of the table
* @param key
* The record key of the record to insert.
* @param values
* A HashMap of field/value pairs to insert in the record
* @return Zero on success, a non-zero error code on error
*/
@Override
public Status insert(String table, String key,
HashMap<String, ByteIterator> values) {
try {
Insert insertStmt = QueryBuilder.insertInto(table);
// Add key
insertStmt.value(YCSB_KEY, key);
// Add fields
for (Map.Entry<String, ByteIterator> entry : values.entrySet()) {
Object value;
ByteIterator byteIterator = entry.getValue();
value = byteIterator.toString();
insertStmt.value(entry.getKey(), value);
}
insertStmt.setConsistencyLevel(writeConsistencyLevel);
if (debug) {
System.out.println(insertStmt.toString());
}
session.execute(insertStmt);
return Status.OK;
} catch (Exception e) {
e.printStackTrace();
}
return Status.ERROR;
}
/**
* Delete a record from the database.
*
* @param table
* The name of the table
* @param key
* The record key of the record to delete.
* @return Zero on success, a non-zero error code on error
*/
@Override
public Status delete(String table, String key) {
try {
Statement stmt;
stmt = QueryBuilder.delete().from(table)
.where(QueryBuilder.eq(YCSB_KEY, key));
stmt.setConsistencyLevel(writeConsistencyLevel);
if (debug) {
System.out.println(stmt.toString());
}
session.execute(stmt);
return Status.OK;
} catch (Exception e) {
e.printStackTrace();
System.out.println("Error deleting key: " + key);
}
return Status.ERROR;
}
}
/*
* Copyright (c) 2014, Yahoo!, Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
/**
* The YCSB binding for <a href="http://cassandra.apache.org/">Cassandra</a>
* 2.1+ via CQL.
*/
package com.yahoo.ycsb.db;
......@@ -73,9 +73,7 @@ LICENSE file.
<hbase098.version>0.98.14-hadoop2</hbase098.version>
<hbase10.version>1.0.2</hbase10.version>
<accumulo.version>1.6.0</accumulo.version>
<cassandra.version>1.2.9</cassandra.version>
<cassandra.cql.version>1.0.3</cassandra.cql.version>
<cassandra2.cql.version>3.0.0</cassandra2.cql.version>
<cassandra.cql.version>3.0.0</cassandra.cql.version>
<geode.version>1.0.0-incubating.M2</geode.version>
<googlebigtable.version>0.2.3</googlebigtable.version>
<infinispan.version>7.2.2.Final</infinispan.version>
......@@ -108,7 +106,6 @@ LICENSE file.
<module>aerospike</module>
<module>asynchbase</module>
<module>cassandra</module>
<module>cassandra2</module>
<module>couchbase</module>
<module>couchbase2</module>
<module>distribution</module>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment