diff --git a/cassandra/pom.xml b/cassandra/pom.xml index 0882f42377fc7e26d85241e19946340081ba2f29..07155bc11e6e58169907bab70df6d2f3ff9cf607 100644 --- a/cassandra/pom.xml +++ b/cassandra/pom.xml @@ -48,4 +48,28 @@ LICENSE file. <scope>provided</scope> </dependency> </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + <version>2.15</version> + <configuration> + <consoleOutput>true</consoleOutput> + <configLocation>../checkstyle.xml</configLocation> + <failOnViolation>true</failOnViolation> + <failsOnError>true</failsOnError> + </configuration> + <executions> + <execution> + <id>validate</id> + <phase>validate</phase> + <goals> + <goal>checkstyle</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> </project> diff --git a/cassandra/src/main/java/com/yahoo/ycsb/db/CassandraCQLClient.java b/cassandra/src/main/java/com/yahoo/ycsb/db/CassandraCQLClient.java index 578a5a8f50ef28778d996e5181c1962d1d42dac2..33fdcea04c079e1e107617dd0c556d2de0315c9b 100755 --- a/cassandra/src/main/java/com/yahoo/ycsb/db/CassandraCQLClient.java +++ b/cassandra/src/main/java/com/yahoo/ycsb/db/CassandraCQLClient.java @@ -30,380 +30,409 @@ import java.util.HashMap; import java.util.Vector; import java.util.concurrent.atomic.AtomicInteger; - /** * Tested with Cassandra 2.0, CQL client for YCSB framework * - * See {@code cassandra2} for a version compatible with Cassandra 2.1+. - * See {@code cassandra2/README.md} for details. + * See {@code cassandra2} for a version compatible with Cassandra 2.1+. See + * {@code cassandra2/README.md} for details. * * @author cmatser */ public class CassandraCQLClient extends DB { - private static Cluster cluster = null; - private static Session session = null; - - private static ConsistencyLevel readConsistencyLevel = ConsistencyLevel.ONE; - private static ConsistencyLevel writeConsistencyLevel = ConsistencyLevel.ONE; + private static Cluster cluster = null; + private static Session session = null; - public static final int OK = 0; - public static final int ERR = -1; - - public static final String YCSB_KEY = "y_id"; - public static final String KEYSPACE_PROPERTY = "cassandra.keyspace"; - public static final String KEYSPACE_PROPERTY_DEFAULT = "ycsb"; - public static final String USERNAME_PROPERTY = "cassandra.username"; - public static final String PASSWORD_PROPERTY = "cassandra.password"; - - public static final String HOSTS_PROPERTY = "hosts"; - public static final String PORT_PROPERTY = "port"; - - - public static final String READ_CONSISTENCY_LEVEL_PROPERTY = "cassandra.readconsistencylevel"; - public static final String READ_CONSISTENCY_LEVEL_PROPERTY_DEFAULT = "ONE"; - public static final String WRITE_CONSISTENCY_LEVEL_PROPERTY = "cassandra.writeconsistencylevel"; - public static final String WRITE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT = "ONE"; - - /** Count the number of times initialized to teardown on the last {@link #cleanup()}. */ - private static final AtomicInteger initCount = new AtomicInteger(0); - - private static boolean _debug = false; - - /** - * Initialize any state for this DB. Called once per DB instance; there is - * one DB instance per client thread. - */ - @Override - public void init() throws DBException { - - //Keep track of number of calls to init (for later cleanup) - initCount.incrementAndGet(); - - //Synchronized so that we only have a single - // cluster/session instance for all the threads. - synchronized (initCount) { - - //Check if the cluster has already been initialized - if (cluster != null) { - return; - } - - try { - - _debug = Boolean.parseBoolean(getProperties().getProperty("debug", "false")); - - String host = getProperties().getProperty(HOSTS_PROPERTY); - if (host == null) { - throw new DBException(String.format("Required property \"%s\" missing for CassandraCQLClient", HOSTS_PROPERTY)); - } - String hosts[] = host.split(","); - String port = getProperties().getProperty("port", "9042"); - if (port == null) { - throw new DBException(String.format("Required property \"%s\" missing for CassandraCQLClient", PORT_PROPERTY)); - } - - String username = getProperties().getProperty(USERNAME_PROPERTY); - String password = getProperties().getProperty(PASSWORD_PROPERTY); - - String keyspace = getProperties().getProperty(KEYSPACE_PROPERTY, KEYSPACE_PROPERTY_DEFAULT); - - readConsistencyLevel = ConsistencyLevel.valueOf(getProperties().getProperty(READ_CONSISTENCY_LEVEL_PROPERTY, READ_CONSISTENCY_LEVEL_PROPERTY_DEFAULT)); - writeConsistencyLevel = ConsistencyLevel.valueOf(getProperties().getProperty(WRITE_CONSISTENCY_LEVEL_PROPERTY, WRITE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT)); - - // public void connect(String node) {} - if ((username != null) && !username.isEmpty()) { - cluster = Cluster.builder() - .withCredentials(username, password) - .withPort(Integer.valueOf(port)) - .addContactPoints(hosts).build(); - } - else { - cluster = Cluster.builder() - .withPort(Integer.valueOf(port)) - .addContactPoints(hosts).build(); - } - - //Update number of connections based on threads - int threadcount = Integer.parseInt(getProperties().getProperty("threadcount","1")); - cluster.getConfiguration().getPoolingOptions().setMaxConnectionsPerHost(HostDistance.LOCAL, threadcount); - - //Set connection timeout 3min (default is 5s) - cluster.getConfiguration().getSocketOptions().setConnectTimeoutMillis(3*60*1000); - //Set read (execute) timeout 3min (default is 12s) - cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(3*60*1000); - - Metadata metadata = cluster.getMetadata(); - System.out.printf("Connected to cluster: %s\n", metadata.getClusterName()); - - for (Host discoveredHost : metadata.getAllHosts()) { - System.out.printf("Datacenter: %s; Host: %s; Rack: %s\n", - discoveredHost.getDatacenter(), - discoveredHost.getAddress(), - discoveredHost.getRack()); - } - - session = cluster.connect(keyspace); - - } catch (Exception e) { - throw new DBException(e); - } - }//synchronized - } + private static ConsistencyLevel readConsistencyLevel = ConsistencyLevel.ONE; + private static ConsistencyLevel writeConsistencyLevel = ConsistencyLevel.ONE; - /** - * Cleanup any state for this DB. Called once per DB instance; there is one - * DB instance per client thread. - */ - @Override - public void cleanup() throws DBException { - if (initCount.decrementAndGet() <= 0) { - cluster.shutdown(); - } - } + public static final int OK = 0; + public static final int ERR = -1; + + public static final String YCSB_KEY = "y_id"; + public static final String KEYSPACE_PROPERTY = "cassandra.keyspace"; + public static final String KEYSPACE_PROPERTY_DEFAULT = "ycsb"; + public static final String USERNAME_PROPERTY = "cassandra.username"; + public static final String PASSWORD_PROPERTY = "cassandra.password"; + + public static final String HOSTS_PROPERTY = "hosts"; + public static final String PORT_PROPERTY = "port"; + + public static final String READ_CONSISTENCY_LEVEL_PROPERTY = + "cassandra.readconsistencylevel"; + public static final String READ_CONSISTENCY_LEVEL_PROPERTY_DEFAULT = "ONE"; + public static final String WRITE_CONSISTENCY_LEVEL_PROPERTY = + "cassandra.writeconsistencylevel"; + public static final String WRITE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT = "ONE"; + + /** + * Count the number of times initialized to teardown on the last + * {@link #cleanup()}. + */ + private static final AtomicInteger INIT_COUNT = new AtomicInteger(0); + + private static boolean debug = false; + + /** + * Initialize any state for this DB. Called once per DB instance; there is one + * DB instance per client thread. + */ + @Override + public void init() throws DBException { + + // Keep track of number of calls to init (for later cleanup) + INIT_COUNT.incrementAndGet(); + + // Synchronized so that we only have a single + // cluster/session instance for all the threads. + synchronized (INIT_COUNT) { + + // Check if the cluster has already been initialized + if (cluster != null) { + return; + } + + try { + + debug = + Boolean.parseBoolean(getProperties().getProperty("debug", "false")); - /** - * Read a record from the database. Each field/value pair from the result - * will be stored in a HashMap. - * - * @param table The name of the table - * @param key The record key of the record to read. - * @param fields The list of fields to read, or null for all of them - * @param result A HashMap of field/value pairs for the result - * @return Zero on success, a non-zero error code on error - */ - @Override - public int read(String table, String key, Set<String> fields, HashMap<String, ByteIterator> result) { - - try { - Statement stmt; - Select.Builder selectBuilder; - - if (fields == null) { - selectBuilder = QueryBuilder.select().all(); - } - else { - selectBuilder = QueryBuilder.select(); - for (String col : fields) { - ((Select.Selection) selectBuilder).column(col); - } - } - - stmt = selectBuilder.from(table).where(QueryBuilder.eq(YCSB_KEY, key)).limit(1); - stmt.setConsistencyLevel(readConsistencyLevel); - - if (_debug) { - System.out.println(stmt.toString()); - } - - ResultSet rs = session.execute(stmt); - - //Should be only 1 row - if (!rs.isExhausted()) { - Row row = rs.one(); - ColumnDefinitions cd = row.getColumnDefinitions(); - - for (ColumnDefinitions.Definition def : cd) { - ByteBuffer val = row.getBytesUnsafe(def.getName()); - if (val != null) { - result.put(def.getName(), - new ByteArrayByteIterator(val.array())); - } - else { - result.put(def.getName(), null); - } - } - - } - - return OK; - - } catch (Exception e) { - e.printStackTrace(); - System.out.println("Error reading key: " + key); - return ERR; + String host = getProperties().getProperty(HOSTS_PROPERTY); + if (host == null) { + throw new DBException(String.format( + "Required property \"%s\" missing for CassandraCQLClient", + HOSTS_PROPERTY)); + } + String[] hosts = host.split(","); + String port = getProperties().getProperty("port", "9042"); + if (port == null) { + throw new DBException(String.format( + "Required property \"%s\" missing for CassandraCQLClient", + PORT_PROPERTY)); } - } + String username = getProperties().getProperty(USERNAME_PROPERTY); + String password = getProperties().getProperty(PASSWORD_PROPERTY); + + String keyspace = getProperties().getProperty(KEYSPACE_PROPERTY, + KEYSPACE_PROPERTY_DEFAULT); + + readConsistencyLevel = ConsistencyLevel.valueOf( + getProperties().getProperty(READ_CONSISTENCY_LEVEL_PROPERTY, + READ_CONSISTENCY_LEVEL_PROPERTY_DEFAULT)); + writeConsistencyLevel = ConsistencyLevel.valueOf( + getProperties().getProperty(WRITE_CONSISTENCY_LEVEL_PROPERTY, + WRITE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT)); + + // public void connect(String node) {} + if ((username != null) && !username.isEmpty()) { + cluster = Cluster.builder().withCredentials(username, password) + .withPort(Integer.valueOf(port)).addContactPoints(hosts).build(); + } else { + cluster = Cluster.builder().withPort(Integer.valueOf(port)) + .addContactPoints(hosts).build(); + } - /** - * Perform a range scan for a set of records in the database. Each - * field/value pair from the result will be stored in a HashMap. - * - * Cassandra CQL uses "token" method for range scan which doesn't always - * yield intuitive results. - * - * @param table The name of the table - * @param startkey The record key of the first record to read. - * @param recordcount The number of records to read - * @param fields The list of fields to read, or null for all of them - * @param result A Vector of HashMaps, where each HashMap is a set - * field/value pairs for one record - * @return Zero on success, a non-zero error code on error - */ - @Override - public int scan(String table, String startkey, int recordcount, Set<String> fields, Vector<HashMap<String, ByteIterator>> result) { - - try { - Statement stmt; - Select.Builder selectBuilder; - - if (fields == null) { - selectBuilder = QueryBuilder.select().all(); - } - else { - selectBuilder = QueryBuilder.select(); - for (String col : fields) { - ((Select.Selection) selectBuilder).column(col); - } - } - - stmt = selectBuilder.from(table); - - //The statement builder is not setup right for tokens. - // So, we need to build it manually. - String initialStmt = stmt.toString(); - StringBuilder scanStmt = new StringBuilder(); - scanStmt.append( - initialStmt.substring(0, initialStmt.length()-1)); - scanStmt.append(" WHERE "); - scanStmt.append(QueryBuilder.token(YCSB_KEY)); - scanStmt.append(" >= "); - scanStmt.append("token('"); - scanStmt.append(startkey); - scanStmt.append("')"); - scanStmt.append(" LIMIT "); - scanStmt.append(recordcount); - - stmt = new SimpleStatement(scanStmt.toString()); - stmt.setConsistencyLevel(readConsistencyLevel); - - if (_debug) { - System.out.println(stmt.toString()); - } - - ResultSet rs = session.execute(stmt); - - HashMap<String, ByteIterator> tuple; - while (!rs.isExhausted()) { - Row row = rs.one(); - tuple = new HashMap<String, ByteIterator> (); - - ColumnDefinitions cd = row.getColumnDefinitions(); - - for (ColumnDefinitions.Definition def : cd) { - ByteBuffer val = row.getBytesUnsafe(def.getName()); - if (val != null) { - tuple.put(def.getName(), - new ByteArrayByteIterator(val.array())); - } - else { - tuple.put(def.getName(), null); - } - } - - result.add(tuple); - } - - return OK; - - } catch (Exception e) { - e.printStackTrace(); - System.out.println("Error scanning with startkey: " + startkey); - return ERR; + // Update number of connections based on threads + int threadcount = + Integer.parseInt(getProperties().getProperty("threadcount", "1")); + cluster.getConfiguration().getPoolingOptions() + .setMaxConnectionsPerHost(HostDistance.LOCAL, threadcount); + + // Set connection timeout 3min (default is 5s) + cluster.getConfiguration().getSocketOptions() + .setConnectTimeoutMillis(3 * 60 * 1000); + // Set read (execute) timeout 3min (default is 12s) + cluster.getConfiguration().getSocketOptions() + .setReadTimeoutMillis(3 * 60 * 1000); + + Metadata metadata = cluster.getMetadata(); + System.out.printf("Connected to cluster: %s\n", + metadata.getClusterName()); + + for (Host discoveredHost : metadata.getAllHosts()) { + System.out.printf("Datacenter: %s; Host: %s; Rack: %s\n", + discoveredHost.getDatacenter(), discoveredHost.getAddress(), + discoveredHost.getRack()); } + session = cluster.connect(keyspace); + + } catch (Exception e) { + throw new DBException(e); + } + } // synchronized + } + + /** + * Cleanup any state for this DB. Called once per DB instance; there is one DB + * instance per client thread. + */ + @Override + public void cleanup() throws DBException { + if (INIT_COUNT.decrementAndGet() <= 0) { + cluster.shutdown(); } + } + + /** + * Read a record from the database. Each field/value pair from the result will + * be stored in a HashMap. + * + * @param table + * The name of the table + * @param key + * The record key of the record to read. + * @param fields + * The list of fields to read, or null for all of them + * @param result + * A HashMap of field/value pairs for the result + * @return Zero on success, a non-zero error code on error + */ + @Override + public int read(String table, String key, Set<String> fields, + HashMap<String, ByteIterator> result) { + + try { + Statement stmt; + Select.Builder selectBuilder; + + if (fields == null) { + selectBuilder = QueryBuilder.select().all(); + } else { + selectBuilder = QueryBuilder.select(); + for (String col : fields) { + ((Select.Selection) selectBuilder).column(col); + } + } + + stmt = selectBuilder.from(table).where(QueryBuilder.eq(YCSB_KEY, key)) + .limit(1); + stmt.setConsistencyLevel(readConsistencyLevel); + + if (debug) { + System.out.println(stmt.toString()); + } + + ResultSet rs = session.execute(stmt); + + // Should be only 1 row + if (!rs.isExhausted()) { + Row row = rs.one(); + ColumnDefinitions cd = row.getColumnDefinitions(); + + for (ColumnDefinitions.Definition def : cd) { + ByteBuffer val = row.getBytesUnsafe(def.getName()); + if (val != null) { + result.put(def.getName(), new ByteArrayByteIterator(val.array())); + } else { + result.put(def.getName(), null); + } + } - /** - * Update a record in the database. Any field/value pairs in the specified - * values HashMap will be written into the record with the specified record - * key, overwriting any existing values with the same field name. - * - * @param table The name of the table - * @param key The record key of the record to write. - * @param values A HashMap of field/value pairs to update in the record - * @return Zero on success, a non-zero error code on error - */ - @Override - public int update(String table, String key, HashMap<String, ByteIterator> values) { - //Insert and updates provide the same functionality - return insert(table, key, values); - } + } - /** - * Insert a record in the database. Any field/value pairs in the specified - * values HashMap will be written into the record with the specified record - * key. - * - * @param table The name of the table - * @param key The record key of the record to insert. - * @param values A HashMap of field/value pairs to insert in the record - * @return Zero on success, a non-zero error code on error - */ - @Override - public int insert(String table, String key, HashMap<String, ByteIterator> values) { - - try { - Insert insertStmt = QueryBuilder.insertInto(table); - - //Add key - insertStmt.value(YCSB_KEY, key); - - //Add fields - for (Map.Entry<String, ByteIterator> entry : values.entrySet()) { - Object value; - ByteIterator byteIterator = entry.getValue(); - value = byteIterator.toString(); - - insertStmt.value(entry.getKey(), value); - } - - insertStmt.setConsistencyLevel(writeConsistencyLevel); - - if (_debug) { - System.out.println(insertStmt.toString()); - } - - ResultSet rs = session.execute(insertStmt); - - return OK; - } catch (Exception e) { - e.printStackTrace(); - } + return OK; - return ERR; + } catch (Exception e) { + e.printStackTrace(); + System.out.println("Error reading key: " + key); + return ERR; } - /** - * Delete a record from the database. - * - * @param table The name of the table - * @param key The record key of the record to delete. - * @return Zero on success, a non-zero error code on error - */ - @Override - public int delete(String table, String key) { - - try { - Statement stmt; + } + + /** + * Perform a range scan for a set of records in the database. Each field/value + * pair from the result will be stored in a HashMap. + * + * Cassandra CQL uses "token" method for range scan which doesn't always yield + * intuitive results. + * + * @param table + * The name of the table + * @param startkey + * The record key of the first record to read. + * @param recordcount + * The number of records to read + * @param fields + * The list of fields to read, or null for all of them + * @param result + * A Vector of HashMaps, where each HashMap is a set field/value + * pairs for one record + * @return Zero on success, a non-zero error code on error + */ + @Override + public int scan(String table, String startkey, int recordcount, + Set<String> fields, Vector<HashMap<String, ByteIterator>> result) { + + try { + Statement stmt; + Select.Builder selectBuilder; + + if (fields == null) { + selectBuilder = QueryBuilder.select().all(); + } else { + selectBuilder = QueryBuilder.select(); + for (String col : fields) { + ((Select.Selection) selectBuilder).column(col); + } + } + + stmt = selectBuilder.from(table); + + // The statement builder is not setup right for tokens. + // So, we need to build it manually. + String initialStmt = stmt.toString(); + StringBuilder scanStmt = new StringBuilder(); + scanStmt.append(initialStmt.substring(0, initialStmt.length() - 1)); + scanStmt.append(" WHERE "); + scanStmt.append(QueryBuilder.token(YCSB_KEY)); + scanStmt.append(" >= "); + scanStmt.append("token('"); + scanStmt.append(startkey); + scanStmt.append("')"); + scanStmt.append(" LIMIT "); + scanStmt.append(recordcount); + + stmt = new SimpleStatement(scanStmt.toString()); + stmt.setConsistencyLevel(readConsistencyLevel); + + if (debug) { + System.out.println(stmt.toString()); + } + + ResultSet rs = session.execute(stmt); + + HashMap<String, ByteIterator> tuple; + while (!rs.isExhausted()) { + Row row = rs.one(); + tuple = new HashMap<String, ByteIterator>(); + + ColumnDefinitions cd = row.getColumnDefinitions(); + + for (ColumnDefinitions.Definition def : cd) { + ByteBuffer val = row.getBytesUnsafe(def.getName()); + if (val != null) { + tuple.put(def.getName(), new ByteArrayByteIterator(val.array())); + } else { + tuple.put(def.getName(), null); + } + } - stmt = QueryBuilder.delete().from(table).where(QueryBuilder.eq(YCSB_KEY, key)); - stmt.setConsistencyLevel(writeConsistencyLevel); + result.add(tuple); + } - if (_debug) { - System.out.println(stmt.toString()); - } + return OK; - ResultSet rs = session.execute(stmt); + } catch (Exception e) { + e.printStackTrace(); + System.out.println("Error scanning with startkey: " + startkey); + return ERR; + } - return OK; - } catch (Exception e) { - e.printStackTrace(); - System.out.println("Error deleting key: " + key); - } + } + + /** + * Update a record in the database. Any field/value pairs in the specified + * values HashMap will be written into the record with the specified record + * key, overwriting any existing values with the same field name. + * + * @param table + * The name of the table + * @param key + * The record key of the record to write. + * @param values + * A HashMap of field/value pairs to update in the record + * @return Zero on success, a non-zero error code on error + */ + @Override + public int update(String table, String key, + HashMap<String, ByteIterator> values) { + // Insert and updates provide the same functionality + return insert(table, key, values); + } + + /** + * Insert a record in the database. Any field/value pairs in the specified + * values HashMap will be written into the record with the specified record + * key. + * + * @param table + * The name of the table + * @param key + * The record key of the record to insert. + * @param values + * A HashMap of field/value pairs to insert in the record + * @return Zero on success, a non-zero error code on error + */ + @Override + public int insert(String table, String key, + HashMap<String, ByteIterator> values) { + + try { + Insert insertStmt = QueryBuilder.insertInto(table); + + // Add key + insertStmt.value(YCSB_KEY, key); + + // Add fields + for (Map.Entry<String, ByteIterator> entry : values.entrySet()) { + Object value; + ByteIterator byteIterator = entry.getValue(); + value = byteIterator.toString(); + + insertStmt.value(entry.getKey(), value); + } + + insertStmt.setConsistencyLevel(writeConsistencyLevel); + + if (debug) { + System.out.println(insertStmt.toString()); + } + + ResultSet rs = session.execute(insertStmt); + + return OK; + } catch (Exception e) { + e.printStackTrace(); + } - return ERR; + return ERR; + } + + /** + * Delete a record from the database. + * + * @param table + * The name of the table + * @param key + * The record key of the record to delete. + * @return Zero on success, a non-zero error code on error + */ + @Override + public int delete(String table, String key) { + + try { + Statement stmt; + + stmt = QueryBuilder.delete().from(table) + .where(QueryBuilder.eq(YCSB_KEY, key)); + stmt.setConsistencyLevel(writeConsistencyLevel); + + if (debug) { + System.out.println(stmt.toString()); + } + + ResultSet rs = session.execute(stmt); + + return OK; + } catch (Exception e) { + e.printStackTrace(); + System.out.println("Error deleting key: " + key); } + return ERR; + } + } diff --git a/cassandra/src/main/java/com/yahoo/ycsb/db/CassandraClient10.java b/cassandra/src/main/java/com/yahoo/ycsb/db/CassandraClient10.java index 669cf0294da83164f93695f6cff883eb5f2cfe55..30349ef4de10ecba213228dedba1adaa58f15b0b 100644 --- a/cassandra/src/main/java/com/yahoo/ycsb/db/CassandraClient10.java +++ b/cassandra/src/main/java/com/yahoo/ycsb/db/CassandraClient10.java @@ -17,46 +17,57 @@ package com.yahoo.ycsb.db; -import com.yahoo.ycsb.*; - +import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; -import java.util.HashMap; -import java.util.HashSet; import java.util.Vector; -import java.util.Random; -import java.util.Properties; -import java.nio.ByteBuffer; -import org.apache.thrift.transport.TTransport; +import org.apache.cassandra.thrift.AuthenticationRequest; +import org.apache.cassandra.thrift.Cassandra; +import org.apache.cassandra.thrift.Column; +import org.apache.cassandra.thrift.ColumnOrSuperColumn; +import org.apache.cassandra.thrift.ColumnParent; +import org.apache.cassandra.thrift.ColumnPath; +import org.apache.cassandra.thrift.ConsistencyLevel; +import org.apache.cassandra.thrift.KeyRange; +import org.apache.cassandra.thrift.KeySlice; +import org.apache.cassandra.thrift.Mutation; +import org.apache.cassandra.thrift.SlicePredicate; +import org.apache.cassandra.thrift.SliceRange; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TSocket; -import org.apache.thrift.protocol.TProtocol; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.apache.cassandra.thrift.*; +import org.apache.thrift.transport.TTransport; +import com.yahoo.ycsb.ByteArrayByteIterator; +import com.yahoo.ycsb.ByteIterator; +import com.yahoo.ycsb.DB; +import com.yahoo.ycsb.DBException; +import com.yahoo.ycsb.StringByteIterator; +import com.yahoo.ycsb.Utils; //XXXX if we do replication, fix the consistency levels /** - * Cassandra 1.0.6 client for YCSB framework + * Cassandra 1.0.6 client for YCSB framework. */ -public class CassandraClient10 extends DB -{ - static Random random = new Random(); - public static final int Ok = 0; - public static final int Error = -1; - public static final ByteBuffer emptyByteBuffer = ByteBuffer.wrap(new byte[0]); - - public int ConnectionRetries; - public int OperationRetries; - public String column_family; - - public static final String CONNECTION_RETRY_PROPERTY = "cassandra.connectionretries"; +public class CassandraClient10 extends DB { + public static final int OK = 0; + public static final int ERROR = -1; + public static final ByteBuffer EMPTY_BYTE_BUFFER = + ByteBuffer.wrap(new byte[0]); + + public static final String CONNECTION_RETRY_PROPERTY = + "cassandra.connectionretries"; public static final String CONNECTION_RETRY_PROPERTY_DEFAULT = "300"; - public static final String OPERATION_RETRY_PROPERTY = "cassandra.operationretries"; + public static final String OPERATION_RETRY_PROPERTY = + "cassandra.operationretries"; public static final String OPERATION_RETRY_PROPERTY_DEFAULT = "300"; public static final String USERNAME_PROPERTY = "cassandra.username"; @@ -64,118 +75,127 @@ public class CassandraClient10 extends DB public static final String COLUMN_FAMILY_PROPERTY = "cassandra.columnfamily"; public static final String COLUMN_FAMILY_PROPERTY_DEFAULT = "data"; - - public static final String READ_CONSISTENCY_LEVEL_PROPERTY = "cassandra.readconsistencylevel"; + + public static final String READ_CONSISTENCY_LEVEL_PROPERTY = + "cassandra.readconsistencylevel"; public static final String READ_CONSISTENCY_LEVEL_PROPERTY_DEFAULT = "ONE"; - public static final String WRITE_CONSISTENCY_LEVEL_PROPERTY = "cassandra.writeconsistencylevel"; + public static final String WRITE_CONSISTENCY_LEVEL_PROPERTY = + "cassandra.writeconsistencylevel"; public static final String WRITE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT = "ONE"; - public static final String SCAN_CONSISTENCY_LEVEL_PROPERTY = "cassandra.scanconsistencylevel"; + public static final String SCAN_CONSISTENCY_LEVEL_PROPERTY = + "cassandra.scanconsistencylevel"; public static final String SCAN_CONSISTENCY_LEVEL_PROPERTY_DEFAULT = "ONE"; - public static final String DELETE_CONSISTENCY_LEVEL_PROPERTY = "cassandra.deleteconsistencylevel"; + public static final String DELETE_CONSISTENCY_LEVEL_PROPERTY = + "cassandra.deleteconsistencylevel"; public static final String DELETE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT = "ONE"; + private int connectionRetries; + private int operationRetries; + private String columnFamily; - TTransport tr; - Cassandra.Client client; + private TTransport tr; + private Cassandra.Client client; - boolean _debug = false; + private boolean debug = false; - String _table = ""; - Exception errorexception = null; + private String tableName = ""; + private Exception errorexception = null; - List<Mutation> mutations = new ArrayList<Mutation>(); - Map<String, List<Mutation>> mutationMap = new HashMap<String, List<Mutation>>(); - Map<ByteBuffer, Map<String, List<Mutation>>> record = new HashMap<ByteBuffer, Map<String, List<Mutation>>>(); + private List<Mutation> mutations = new ArrayList<Mutation>(); + private Map<String, List<Mutation>> mutationMap = + new HashMap<String, List<Mutation>>(); + private Map<ByteBuffer, Map<String, List<Mutation>>> record = + new HashMap<ByteBuffer, Map<String, List<Mutation>>>(); - ColumnParent parent; - - ConsistencyLevel readConsistencyLevel = ConsistencyLevel.ONE; - ConsistencyLevel writeConsistencyLevel = ConsistencyLevel.ONE; - ConsistencyLevel scanConsistencyLevel = ConsistencyLevel.ONE; - ConsistencyLevel deleteConsistencyLevel = ConsistencyLevel.ONE; + private ColumnParent parent; + private ConsistencyLevel readConsistencyLevel = ConsistencyLevel.ONE; + private ConsistencyLevel writeConsistencyLevel = ConsistencyLevel.ONE; + private ConsistencyLevel scanConsistencyLevel = ConsistencyLevel.ONE; + private ConsistencyLevel deleteConsistencyLevel = ConsistencyLevel.ONE; /** * Initialize any state for this DB. Called once per DB instance; there is one * DB instance per client thread. */ - public void init() throws DBException - { + public void init() throws DBException { String hosts = getProperties().getProperty("hosts"); - if (hosts == null) - { - throw new DBException("Required property \"hosts\" missing for CassandraClient"); + if (hosts == null) { + throw new DBException( + "Required property \"hosts\" missing for CassandraClient"); } - column_family = getProperties().getProperty(COLUMN_FAMILY_PROPERTY, COLUMN_FAMILY_PROPERTY_DEFAULT); - parent = new ColumnParent(column_family); + columnFamily = getProperties().getProperty(COLUMN_FAMILY_PROPERTY, + COLUMN_FAMILY_PROPERTY_DEFAULT); + parent = new ColumnParent(columnFamily); - ConnectionRetries = Integer.parseInt(getProperties().getProperty(CONNECTION_RETRY_PROPERTY, - CONNECTION_RETRY_PROPERTY_DEFAULT)); - OperationRetries = Integer.parseInt(getProperties().getProperty(OPERATION_RETRY_PROPERTY, - OPERATION_RETRY_PROPERTY_DEFAULT)); + connectionRetries = + Integer.parseInt(getProperties().getProperty(CONNECTION_RETRY_PROPERTY, + CONNECTION_RETRY_PROPERTY_DEFAULT)); + operationRetries = + Integer.parseInt(getProperties().getProperty(OPERATION_RETRY_PROPERTY, + OPERATION_RETRY_PROPERTY_DEFAULT)); String username = getProperties().getProperty(USERNAME_PROPERTY); String password = getProperties().getProperty(PASSWORD_PROPERTY); - readConsistencyLevel = ConsistencyLevel.valueOf(getProperties().getProperty(READ_CONSISTENCY_LEVEL_PROPERTY, READ_CONSISTENCY_LEVEL_PROPERTY_DEFAULT)); - writeConsistencyLevel = ConsistencyLevel.valueOf(getProperties().getProperty(WRITE_CONSISTENCY_LEVEL_PROPERTY, WRITE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT)); - scanConsistencyLevel = ConsistencyLevel.valueOf(getProperties().getProperty(SCAN_CONSISTENCY_LEVEL_PROPERTY, SCAN_CONSISTENCY_LEVEL_PROPERTY_DEFAULT)); - deleteConsistencyLevel = ConsistencyLevel.valueOf(getProperties().getProperty(DELETE_CONSISTENCY_LEVEL_PROPERTY, DELETE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT)); - - - _debug = Boolean.parseBoolean(getProperties().getProperty("debug", "false")); + readConsistencyLevel = ConsistencyLevel + .valueOf(getProperties().getProperty(READ_CONSISTENCY_LEVEL_PROPERTY, + READ_CONSISTENCY_LEVEL_PROPERTY_DEFAULT)); + writeConsistencyLevel = ConsistencyLevel + .valueOf(getProperties().getProperty(WRITE_CONSISTENCY_LEVEL_PROPERTY, + WRITE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT)); + scanConsistencyLevel = ConsistencyLevel + .valueOf(getProperties().getProperty(SCAN_CONSISTENCY_LEVEL_PROPERTY, + SCAN_CONSISTENCY_LEVEL_PROPERTY_DEFAULT)); + deleteConsistencyLevel = ConsistencyLevel + .valueOf(getProperties().getProperty(DELETE_CONSISTENCY_LEVEL_PROPERTY, + DELETE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT)); + + debug = Boolean.parseBoolean(getProperties().getProperty("debug", "false")); String[] allhosts = hosts.split(","); - String myhost = allhosts[random.nextInt(allhosts.length)]; + String myhost = allhosts[Utils.random().nextInt(allhosts.length)]; Exception connectexception = null; - for (int retry = 0; retry < ConnectionRetries; retry++) - { - tr = new TFramedTransport(new TSocket(myhost, Integer.parseInt(getProperties().getProperty("port","9160")))); + for (int retry = 0; retry < connectionRetries; retry++) { + tr = new TFramedTransport(new TSocket(myhost, + Integer.parseInt(getProperties().getProperty("port", "9160")))); TProtocol proto = new TBinaryProtocol(tr); client = new Cassandra.Client(proto); - try - { + try { tr.open(); connectexception = null; break; - } catch (Exception e) - { + } catch (Exception e) { connectexception = e; } - try - { + try { Thread.sleep(1000); - } catch (InterruptedException e) - { + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } } - if (connectexception != null) - { - System.err.println("Unable to connect to " + myhost + " after " + ConnectionRetries - + " tries"); + if (connectexception != null) { + System.err.println("Unable to connect to " + myhost + " after " + + connectionRetries + " tries"); throw new DBException(connectexception); } - if (username != null && password != null) - { - Map<String,String> cred = new HashMap<String,String>(); - cred.put("username", username); - cred.put("password", password); - AuthenticationRequest req = new AuthenticationRequest(cred); - try - { - client.login(req); - } - catch (Exception e) - { - throw new DBException(e); - } + if (username != null && password != null) { + Map<String, String> cred = new HashMap<String, String>(); + cred.put("username", username); + cred.put("password", password); + AuthenticationRequest req = new AuthenticationRequest(cred); + try { + client.login(req); + } catch (Exception e) { + throw new DBException(e); + } } } @@ -183,8 +203,7 @@ public class CassandraClient10 extends DB * Cleanup any state for this DB. Called once per DB instance; there is one DB * instance per client thread. */ - public void cleanup() throws DBException - { + public void cleanup() throws DBException { tr.close(); } @@ -202,89 +221,85 @@ public class CassandraClient10 extends DB * A HashMap of field/value pairs for the result * @return Zero on success, a non-zero error code on error */ - public int read(String table, String key, Set<String> fields, HashMap<String, ByteIterator> result) - { - if (!_table.equals(table)) { - try - { + public int read(String table, String key, Set<String> fields, + HashMap<String, ByteIterator> result) { + if (!tableName.equals(table)) { + try { client.set_keyspace(table); - _table = table; - } - catch (Exception e) - { + tableName = table; + } catch (Exception e) { e.printStackTrace(); e.printStackTrace(System.out); - return Error; + return ERROR; } } - for (int i = 0; i < OperationRetries; i++) - { + for (int i = 0; i < operationRetries; i++) { - try - { + try { SlicePredicate predicate; - if (fields == null) - { - predicate = new SlicePredicate().setSlice_range(new SliceRange(emptyByteBuffer, emptyByteBuffer, false, 1000000)); + if (fields == null) { + predicate = new SlicePredicate().setSlice_range(new SliceRange( + EMPTY_BYTE_BUFFER, EMPTY_BYTE_BUFFER, false, 1000000)); } else { - ArrayList<ByteBuffer> fieldlist = new ArrayList<ByteBuffer>(fields.size()); - for (String s : fields) - { + ArrayList<ByteBuffer> fieldlist = + new ArrayList<ByteBuffer>(fields.size()); + for (String s : fields) { fieldlist.add(ByteBuffer.wrap(s.getBytes("UTF-8"))); } predicate = new SlicePredicate().setColumn_names(fieldlist); } - List<ColumnOrSuperColumn> results = client.get_slice(ByteBuffer.wrap(key.getBytes("UTF-8")), parent, predicate, readConsistencyLevel); + List<ColumnOrSuperColumn> results = + client.get_slice(ByteBuffer.wrap(key.getBytes("UTF-8")), parent, + predicate, readConsistencyLevel); - if (_debug) - { + if (debug) { System.out.print("Reading key: " + key); } Column column; String name; ByteIterator value; - for (ColumnOrSuperColumn oneresult : results) - { + for (ColumnOrSuperColumn oneresult : results) { column = oneresult.column; - name = new String(column.name.array(), column.name.position()+column.name.arrayOffset(), column.name.remaining()); - value = new ByteArrayByteIterator(column.value.array(), column.value.position()+column.value.arrayOffset(), column.value.remaining()); + name = new String(column.name.array(), + column.name.position() + column.name.arrayOffset(), + column.name.remaining()); + value = new ByteArrayByteIterator(column.value.array(), + column.value.position() + column.value.arrayOffset(), + column.value.remaining()); - result.put(name,value); + result.put(name, value); - if (_debug) - { + if (debug) { System.out.print("(" + name + "=" + value + ")"); } } - if (_debug) - { + if (debug) { System.out.println(); - System.out.println("ConsistencyLevel=" + readConsistencyLevel.toString()); + System.out + .println("ConsistencyLevel=" + readConsistencyLevel.toString()); } - return Ok; - } catch (Exception e) - { + return OK; + } catch (Exception e) { errorexception = e; } - try - { + try { Thread.sleep(500); - } catch (InterruptedException e) - { + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } } errorexception.printStackTrace(); errorexception.printStackTrace(System.out); - return Error; + return ERROR; } @@ -305,97 +320,91 @@ public class CassandraClient10 extends DB * pairs for one record * @return Zero on success, a non-zero error code on error */ - public int scan(String table, String startkey, int recordcount, Set<String> fields, - Vector<HashMap<String, ByteIterator>> result) - { - if (!_table.equals(table)) { - try - { + public int scan(String table, String startkey, int recordcount, + Set<String> fields, Vector<HashMap<String, ByteIterator>> result) { + if (!tableName.equals(table)) { + try { client.set_keyspace(table); - _table = table; - } - catch (Exception e) - { + tableName = table; + } catch (Exception e) { e.printStackTrace(); e.printStackTrace(System.out); - return Error; + return ERROR; } } - for (int i = 0; i < OperationRetries; i++) - { + for (int i = 0; i < operationRetries; i++) { - try - { + try { SlicePredicate predicate; - if (fields == null) - { - predicate = new SlicePredicate().setSlice_range(new SliceRange(emptyByteBuffer, emptyByteBuffer, false, 1000000)); + if (fields == null) { + predicate = new SlicePredicate().setSlice_range(new SliceRange( + EMPTY_BYTE_BUFFER, EMPTY_BYTE_BUFFER, false, 1000000)); } else { - ArrayList<ByteBuffer> fieldlist = new ArrayList<ByteBuffer>(fields.size()); - for (String s : fields) - { - fieldlist.add(ByteBuffer.wrap(s.getBytes("UTF-8"))); + ArrayList<ByteBuffer> fieldlist = + new ArrayList<ByteBuffer>(fields.size()); + for (String s : fields) { + fieldlist.add(ByteBuffer.wrap(s.getBytes("UTF-8"))); } predicate = new SlicePredicate().setColumn_names(fieldlist); } - KeyRange kr = new KeyRange().setStart_key(startkey.getBytes("UTF-8")).setEnd_key(new byte[] {}).setCount(recordcount); + KeyRange kr = new KeyRange().setStart_key(startkey.getBytes("UTF-8")) + .setEnd_key(new byte[] {}).setCount(recordcount); - List<KeySlice> results = client.get_range_slices(parent, predicate, kr, scanConsistencyLevel); + List<KeySlice> results = client.get_range_slices(parent, predicate, kr, + scanConsistencyLevel); - if (_debug) - { + if (debug) { System.out.println("Scanning startkey: " + startkey); } HashMap<String, ByteIterator> tuple; - for (KeySlice oneresult : results) - { + for (KeySlice oneresult : results) { tuple = new HashMap<String, ByteIterator>(); Column column; String name; ByteIterator value; - for (ColumnOrSuperColumn onecol : oneresult.columns) - { - column = onecol.column; - name = new String(column.name.array(), column.name.position()+column.name.arrayOffset(), column.name.remaining()); - value = new ByteArrayByteIterator(column.value.array(), column.value.position()+column.value.arrayOffset(), column.value.remaining()); - - tuple.put(name, value); - - if (_debug) - { + for (ColumnOrSuperColumn onecol : oneresult.columns) { + column = onecol.column; + name = new String(column.name.array(), + column.name.position() + column.name.arrayOffset(), + column.name.remaining()); + value = new ByteArrayByteIterator(column.value.array(), + column.value.position() + column.value.arrayOffset(), + column.value.remaining()); + + tuple.put(name, value); + + if (debug) { System.out.print("(" + name + "=" + value + ")"); } } result.add(tuple); - if (_debug) - { + if (debug) { System.out.println(); - System.out.println("ConsistencyLevel=" + scanConsistencyLevel.toString()); + System.out + .println("ConsistencyLevel=" + scanConsistencyLevel.toString()); } } - return Ok; - } catch (Exception e) - { + return OK; + } catch (Exception e) { errorexception = e; } - try - { + try { Thread.sleep(500); - } catch (InterruptedException e) - { + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } } errorexception.printStackTrace(); errorexception.printStackTrace(System.out); - return Error; + return ERROR; } /** @@ -411,8 +420,8 @@ public class CassandraClient10 extends DB * A HashMap of field/value pairs to update in the record * @return Zero on success, a non-zero error code on error */ - public int update(String table, String key, HashMap<String, ByteIterator> values) - { + public int update(String table, String key, + HashMap<String, ByteIterator> values) { return insert(table, key, values); } @@ -429,37 +438,30 @@ public class CassandraClient10 extends DB * A HashMap of field/value pairs to insert in the record * @return Zero on success, a non-zero error code on error */ - public int insert(String table, String key, HashMap<String, ByteIterator> values) - { - if (!_table.equals(table)) { - try - { + public int insert(String table, String key, + HashMap<String, ByteIterator> values) { + if (!tableName.equals(table)) { + try { client.set_keyspace(table); - _table = table; - } - catch (Exception e) - { + tableName = table; + } catch (Exception e) { e.printStackTrace(); e.printStackTrace(System.out); - return Error; + return ERROR; } } - for (int i = 0; i < OperationRetries; i++) - { - if (_debug) - { + for (int i = 0; i < operationRetries; i++) { + if (debug) { System.out.println("Inserting key: " + key); } - try - { + try { ByteBuffer wrappedKey = ByteBuffer.wrap(key.getBytes("UTF-8")); Column col; ColumnOrSuperColumn column; - for (Map.Entry<String, ByteIterator> entry : values.entrySet()) - { + for (Map.Entry<String, ByteIterator> entry : values.entrySet()) { col = new Column(); col.setName(ByteBuffer.wrap(entry.getKey().getBytes("UTF-8"))); col.setValue(ByteBuffer.wrap(entry.getValue().toArray())); @@ -471,7 +473,7 @@ public class CassandraClient10 extends DB mutations.add(new Mutation().setColumn_or_supercolumn(column)); } - mutationMap.put(column_family, mutations); + mutationMap.put(columnFamily, mutations); record.put(wrappedKey, mutationMap); client.batch_mutate(record, writeConsistencyLevel); @@ -479,28 +481,26 @@ public class CassandraClient10 extends DB mutations.clear(); mutationMap.clear(); record.clear(); - - if (_debug) - { - System.out.println("ConsistencyLevel=" + writeConsistencyLevel.toString()); + + if (debug) { + System.out + .println("ConsistencyLevel=" + writeConsistencyLevel.toString()); } - return Ok; - } catch (Exception e) - { + return OK; + } catch (Exception e) { errorexception = e; } - try - { + try { Thread.sleep(500); - } catch (InterruptedException e) - { + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } } errorexception.printStackTrace(); errorexception.printStackTrace(System.out); - return Error; + return ERROR; } /** @@ -512,56 +512,46 @@ public class CassandraClient10 extends DB * The record key of the record to delete. * @return Zero on success, a non-zero error code on error */ - public int delete(String table, String key) - { - if (!_table.equals(table)) { - try - { + public int delete(String table, String key) { + if (!tableName.equals(table)) { + try { client.set_keyspace(table); - _table = table; - } - catch (Exception e) - { + tableName = table; + } catch (Exception e) { e.printStackTrace(); e.printStackTrace(System.out); - return Error; + return ERROR; } } - for (int i = 0; i < OperationRetries; i++) - { - try - { + for (int i = 0; i < operationRetries; i++) { + try { client.remove(ByteBuffer.wrap(key.getBytes("UTF-8")), - new ColumnPath(column_family), - System.currentTimeMillis(), - deleteConsistencyLevel); + new ColumnPath(columnFamily), System.currentTimeMillis(), + deleteConsistencyLevel); - if (_debug) - { + if (debug) { System.out.println("Delete key: " + key); - System.out.println("ConsistencyLevel=" + deleteConsistencyLevel.toString()); + System.out + .println("ConsistencyLevel=" + deleteConsistencyLevel.toString()); } - return Ok; - } catch (Exception e) - { + return OK; + } catch (Exception e) { errorexception = e; } - try - { + try { Thread.sleep(500); - } catch (InterruptedException e) - { + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } } errorexception.printStackTrace(); errorexception.printStackTrace(System.out); - return Error; + return ERROR; } - public static void main(String[] args) - { + public static void main(String[] args) { CassandraClient10 cli = new CassandraClient10(); Properties props = new Properties(); @@ -569,11 +559,9 @@ public class CassandraClient10 extends DB props.setProperty("hosts", args[0]); cli.setProperties(props); - try - { + try { cli.init(); - } catch (Exception e) - { + } catch (Exception e) { e.printStackTrace(); System.exit(0); } @@ -592,59 +580,11 @@ public class CassandraClient10 extends DB fields.add("favoritecolor"); res = cli.read("usertable", "BrianFrankCooper", null, result); System.out.println("Result of read: " + res); - for (String s : result.keySet()) - { + for (String s : result.keySet()) { System.out.println("[" + s + "]=[" + result.get(s) + "]"); } res = cli.delete("usertable", "BrianFrankCooper"); System.out.println("Result of delete: " + res); } - - /* - * public static void main(String[] args) throws TException, - * InvalidRequestException, UnavailableException, - * UnsupportedEncodingException, NotFoundException { - * - * - * - * String key_user_id = "1"; - * - * - * - * - * client.insert("Keyspace1", key_user_id, new ColumnPath("Standard1", null, - * "age".getBytes("UTF-8")), "24".getBytes("UTF-8"), timestamp, - * ConsistencyLevel.ONE); - * - * - * // read single column ColumnPath path = new ColumnPath("Standard1", null, - * "name".getBytes("UTF-8")); - * - * System.out.println(client.get("Keyspace1", key_user_id, path, - * ConsistencyLevel.ONE)); - * - * - * // read entire row SlicePredicate predicate = new SlicePredicate(null, new - * SliceRange(new byte[0], new byte[0], false, 10)); - * - * ColumnParent parent = new ColumnParent("Standard1", null); - * - * List<ColumnOrSuperColumn> results = client.get_slice("Keyspace1", - * key_user_id, parent, predicate, ConsistencyLevel.ONE); - * - * for (ColumnOrSuperColumn result : results) { - * - * Column column = result.column; - * - * System.out.println(new String(column.name, "UTF-8") + " -> " + new - * String(column.value, "UTF-8")); - * - * } - * - * - * - * - * } - */ } diff --git a/cassandra/src/main/java/com/yahoo/ycsb/db/CassandraClient7.java b/cassandra/src/main/java/com/yahoo/ycsb/db/CassandraClient7.java index e60bc2986abbf46688ae711259e5238d488b82dd..2840460019edfb359873b8deb393bedc64162cb5 100644 --- a/cassandra/src/main/java/com/yahoo/ycsb/db/CassandraClient7.java +++ b/cassandra/src/main/java/com/yahoo/ycsb/db/CassandraClient7.java @@ -17,46 +17,57 @@ package com.yahoo.ycsb.db; -import com.yahoo.ycsb.*; - +import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; -import java.util.HashMap; -import java.util.HashSet; import java.util.Vector; -import java.util.Random; -import java.util.Properties; -import java.nio.ByteBuffer; -import org.apache.thrift.transport.TTransport; +import org.apache.cassandra.thrift.AuthenticationRequest; +import org.apache.cassandra.thrift.Cassandra; +import org.apache.cassandra.thrift.Column; +import org.apache.cassandra.thrift.ColumnOrSuperColumn; +import org.apache.cassandra.thrift.ColumnParent; +import org.apache.cassandra.thrift.ColumnPath; +import org.apache.cassandra.thrift.ConsistencyLevel; +import org.apache.cassandra.thrift.KeyRange; +import org.apache.cassandra.thrift.KeySlice; +import org.apache.cassandra.thrift.Mutation; +import org.apache.cassandra.thrift.SlicePredicate; +import org.apache.cassandra.thrift.SliceRange; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TSocket; -import org.apache.thrift.protocol.TProtocol; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.apache.cassandra.thrift.*; +import org.apache.thrift.transport.TTransport; +import com.yahoo.ycsb.ByteArrayByteIterator; +import com.yahoo.ycsb.ByteIterator; +import com.yahoo.ycsb.DB; +import com.yahoo.ycsb.DBException; +import com.yahoo.ycsb.StringByteIterator; +import com.yahoo.ycsb.Utils; //XXXX if we do replication, fix the consistency levels /** - * Cassandra 0.7 client for YCSB framework + * Cassandra 0.7 client for YCSB framework. */ -public class CassandraClient7 extends DB -{ - static Random random = new Random(); - public static final int Ok = 0; - public static final int Error = -1; - public static final ByteBuffer emptyByteBuffer = ByteBuffer.wrap(new byte[0]); - - public int ConnectionRetries; - public int OperationRetries; - public String column_family; - - public static final String CONNECTION_RETRY_PROPERTY = "cassandra.connectionretries"; +public class CassandraClient7 extends DB { + public static final int OK = 0; + public static final int ERROR = -1; + public static final ByteBuffer EMPTY_BYTE_BUFFER = + ByteBuffer.wrap(new byte[0]); + + public static final String CONNECTION_RETRY_PROPERTY = + "cassandra.connectionretries"; public static final String CONNECTION_RETRY_PROPERTY_DEFAULT = "300"; - public static final String OPERATION_RETRY_PROPERTY = "cassandra.operationretries"; + public static final String OPERATION_RETRY_PROPERTY = + "cassandra.operationretries"; public static final String OPERATION_RETRY_PROPERTY_DEFAULT = "300"; public static final String USERNAME_PROPERTY = "cassandra.username"; @@ -65,94 +76,93 @@ public class CassandraClient7 extends DB public static final String COLUMN_FAMILY_PROPERTY = "cassandra.columnfamily"; public static final String COLUMN_FAMILY_PROPERTY_DEFAULT = "data"; - TTransport tr; - Cassandra.Client client; + private int connectionRetries; + private int operationRetries; + private String columnFamily; + + private TTransport tr; + private Cassandra.Client client; + + private boolean debug = false; - boolean _debug = false; - - String _table = ""; - Exception errorexception = null; - - List<Mutation> mutations = new ArrayList<Mutation>(); - Map<String, List<Mutation>> mutationMap = new HashMap<String, List<Mutation>>(); - Map<ByteBuffer, Map<String, List<Mutation>>> record = new HashMap<ByteBuffer, Map<String, List<Mutation>>>(); - - ColumnParent parent; + private String tableName = ""; + private Exception errorexception = null; + + private List<Mutation> mutations = new ArrayList<Mutation>(); + private Map<String, List<Mutation>> mutationMap = + new HashMap<String, List<Mutation>>(); + private Map<ByteBuffer, Map<String, List<Mutation>>> record = + new HashMap<ByteBuffer, Map<String, List<Mutation>>>(); + + private ColumnParent parent; /** * Initialize any state for this DB. Called once per DB instance; there is one * DB instance per client thread. */ - public void init() throws DBException - { + public void init() throws DBException { String hosts = getProperties().getProperty("hosts"); - if (hosts == null) - { - throw new DBException("Required property \"hosts\" missing for CassandraClient"); + if (hosts == null) { + throw new DBException( + "Required property \"hosts\" missing for CassandraClient"); } - column_family = getProperties().getProperty(COLUMN_FAMILY_PROPERTY, COLUMN_FAMILY_PROPERTY_DEFAULT); - parent = new ColumnParent(column_family); + columnFamily = getProperties().getProperty(COLUMN_FAMILY_PROPERTY, + COLUMN_FAMILY_PROPERTY_DEFAULT); + parent = new ColumnParent(columnFamily); - ConnectionRetries = Integer.parseInt(getProperties().getProperty(CONNECTION_RETRY_PROPERTY, - CONNECTION_RETRY_PROPERTY_DEFAULT)); - OperationRetries = Integer.parseInt(getProperties().getProperty(OPERATION_RETRY_PROPERTY, - OPERATION_RETRY_PROPERTY_DEFAULT)); + connectionRetries = + Integer.parseInt(getProperties().getProperty(CONNECTION_RETRY_PROPERTY, + CONNECTION_RETRY_PROPERTY_DEFAULT)); + operationRetries = + Integer.parseInt(getProperties().getProperty(OPERATION_RETRY_PROPERTY, + OPERATION_RETRY_PROPERTY_DEFAULT)); String username = getProperties().getProperty(USERNAME_PROPERTY); String password = getProperties().getProperty(PASSWORD_PROPERTY); - _debug = Boolean.parseBoolean(getProperties().getProperty("debug", "false")); + debug = Boolean.parseBoolean(getProperties().getProperty("debug", "false")); String[] allhosts = hosts.split(","); - String myhost = allhosts[random.nextInt(allhosts.length)]; + String myhost = allhosts[Utils.random().nextInt(allhosts.length)]; Exception connectexception = null; - for (int retry = 0; retry < ConnectionRetries; retry++) - { + for (int retry = 0; retry < connectionRetries; retry++) { tr = new TFramedTransport(new TSocket(myhost, 9160)); TProtocol proto = new TBinaryProtocol(tr); client = new Cassandra.Client(proto); - try - { + try { tr.open(); connectexception = null; break; - } catch (Exception e) - { + } catch (Exception e) { connectexception = e; } - try - { + try { Thread.sleep(1000); - } catch (InterruptedException e) - { + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } } - if (connectexception != null) - { - System.err.println("Unable to connect to " + myhost + " after " + ConnectionRetries - + " tries"); - System.out.println("Unable to connect to " + myhost + " after " + ConnectionRetries - + " tries"); + if (connectexception != null) { + System.err.println("Unable to connect to " + myhost + " after " + + connectionRetries + " tries"); + System.out.println("Unable to connect to " + myhost + " after " + + connectionRetries + " tries"); throw new DBException(connectexception); } - if (username != null && password != null) - { - Map<String,String> cred = new HashMap<String,String>(); - cred.put("username", username); - cred.put("password", password); - AuthenticationRequest req = new AuthenticationRequest(cred); - try - { - client.login(req); - } - catch (Exception e) - { - throw new DBException(e); - } + if (username != null && password != null) { + Map<String, String> cred = new HashMap<String, String>(); + cred.put("username", username); + cred.put("password", password); + AuthenticationRequest req = new AuthenticationRequest(cred); + try { + client.login(req); + } catch (Exception e) { + throw new DBException(e); + } } } @@ -160,8 +170,7 @@ public class CassandraClient7 extends DB * Cleanup any state for this DB. Called once per DB instance; there is one DB * instance per client thread. */ - public void cleanup() throws DBException - { + public void cleanup() throws DBException { tr.close(); } @@ -179,88 +188,85 @@ public class CassandraClient7 extends DB * A HashMap of field/value pairs for the result * @return Zero on success, a non-zero error code on error */ - public int read(String table, String key, Set<String> fields, HashMap<String, ByteIterator> result) - { - if (!_table.equals(table)) { - try - { + public int read(String table, String key, Set<String> fields, + HashMap<String, ByteIterator> result) { + if (!tableName.equals(table)) { + try { client.set_keyspace(table); - _table = table; - } - catch (Exception e) - { + tableName = table; + } catch (Exception e) { e.printStackTrace(); e.printStackTrace(System.out); - return Error; + return ERROR; } } - for (int i = 0; i < OperationRetries; i++) - { + for (int i = 0; i < operationRetries; i++) { - try - { + try { SlicePredicate predicate; - if (fields == null) - { - predicate = new SlicePredicate().setSlice_range(new SliceRange(emptyByteBuffer, emptyByteBuffer, false, 1000000)); - + if (fields == null) { + SliceRange range = new SliceRange(EMPTY_BYTE_BUFFER, + EMPTY_BYTE_BUFFER, false, 1000000); + + predicate = new SlicePredicate().setSlice_range(range); + } else { - ArrayList<ByteBuffer> fieldlist = new ArrayList<ByteBuffer>(fields.size()); - for (String s : fields) - { + ArrayList<ByteBuffer> fieldlist = + new ArrayList<ByteBuffer>(fields.size()); + for (String s : fields) { fieldlist.add(ByteBuffer.wrap(s.getBytes("UTF-8"))); } predicate = new SlicePredicate().setColumn_names(fieldlist); } - List<ColumnOrSuperColumn> results = client.get_slice(ByteBuffer.wrap(key.getBytes("UTF-8")), parent, predicate, ConsistencyLevel.ONE); + List<ColumnOrSuperColumn> results = + client.get_slice(ByteBuffer.wrap(key.getBytes("UTF-8")), parent, + predicate, ConsistencyLevel.ONE); - if (_debug) - { + if (debug) { System.out.print("Reading key: " + key); } Column column; String name; ByteIterator value; - for (ColumnOrSuperColumn oneresult : results) - { + for (ColumnOrSuperColumn oneresult : results) { column = oneresult.column; - name = new String(column.name.array(), column.name.position()+column.name.arrayOffset(), column.name.remaining()); - value = new ByteArrayByteIterator(column.value.array(), column.value.position()+column.value.arrayOffset(), column.value.remaining()); + name = new String(column.name.array(), + column.name.position() + column.name.arrayOffset(), + column.name.remaining()); + value = new ByteArrayByteIterator(column.value.array(), + column.value.position() + column.value.arrayOffset(), + column.value.remaining()); - result.put(name,value); + result.put(name, value); - if (_debug) - { + if (debug) { System.out.print("(" + name + "=" + value + ")"); } } - if (_debug) - { + if (debug) { System.out.println(); } - return Ok; - } catch (Exception e) - { + return OK; + } catch (Exception e) { errorexception = e; } - try - { + try { Thread.sleep(500); - } catch (InterruptedException e) - { + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } } errorexception.printStackTrace(); errorexception.printStackTrace(System.out); - return Error; + return ERROR; } @@ -281,96 +287,91 @@ public class CassandraClient7 extends DB * pairs for one record * @return Zero on success, a non-zero error code on error */ - public int scan(String table, String startkey, int recordcount, Set<String> fields, - Vector<HashMap<String, ByteIterator>> result) - { - if (!_table.equals(table)) { - try - { + public int scan(String table, String startkey, int recordcount, + Set<String> fields, Vector<HashMap<String, ByteIterator>> result) { + if (!tableName.equals(table)) { + try { client.set_keyspace(table); - _table = table; - } - catch (Exception e) - { + tableName = table; + } catch (Exception e) { e.printStackTrace(); e.printStackTrace(System.out); - return Error; + return ERROR; } } - - for (int i = 0; i < OperationRetries; i++) - { - try - { + for (int i = 0; i < operationRetries; i++) { + + try { SlicePredicate predicate; - if (fields == null) - { - predicate = new SlicePredicate().setSlice_range(new SliceRange(emptyByteBuffer, emptyByteBuffer, false, 1000000)); - + if (fields == null) { + SliceRange range = new SliceRange(EMPTY_BYTE_BUFFER, + EMPTY_BYTE_BUFFER, false, 1000000); + + predicate = new SlicePredicate().setSlice_range(range); + } else { - ArrayList<ByteBuffer> fieldlist = new ArrayList<ByteBuffer>(fields.size()); - for (String s : fields) - { - fieldlist.add(ByteBuffer.wrap(s.getBytes("UTF-8"))); + ArrayList<ByteBuffer> fieldlist = + new ArrayList<ByteBuffer>(fields.size()); + for (String s : fields) { + fieldlist.add(ByteBuffer.wrap(s.getBytes("UTF-8"))); } - + predicate = new SlicePredicate().setColumn_names(fieldlist); } - - KeyRange kr = new KeyRange().setStart_key(startkey.getBytes("UTF-8")).setEnd_key(new byte[] {}).setCount(recordcount); - List<KeySlice> results = client.get_range_slices(parent, predicate, kr, ConsistencyLevel.ONE); + KeyRange kr = new KeyRange().setStart_key(startkey.getBytes("UTF-8")) + .setEnd_key(new byte[] {}).setCount(recordcount); - if (_debug) - { + List<KeySlice> results = client.get_range_slices(parent, predicate, kr, + ConsistencyLevel.ONE); + + if (debug) { System.out.println("Scanning startkey: " + startkey); } HashMap<String, ByteIterator> tuple; - for (KeySlice oneresult : results) - { + for (KeySlice oneresult : results) { tuple = new HashMap<String, ByteIterator>(); - + Column column; String name; ByteIterator value; - for (ColumnOrSuperColumn onecol : oneresult.columns) - { - column = onecol.column; - name = new String(column.name.array(), column.name.position()+column.name.arrayOffset(), column.name.remaining()); - value = new ByteArrayByteIterator(column.value.array(), column.value.position()+column.value.arrayOffset(), column.value.remaining()); - - tuple.put(name, value); - - if (_debug) - { + for (ColumnOrSuperColumn onecol : oneresult.columns) { + column = onecol.column; + name = new String(column.name.array(), + column.name.position() + column.name.arrayOffset(), + column.name.remaining()); + value = new ByteArrayByteIterator(column.value.array(), + column.value.position() + column.value.arrayOffset(), + column.value.remaining()); + + tuple.put(name, value); + + if (debug) { System.out.print("(" + name + "=" + value + ")"); } } result.add(tuple); - if (_debug) - { + if (debug) { System.out.println(); } } - return Ok; - } catch (Exception e) - { + return OK; + } catch (Exception e) { errorexception = e; } - try - { + try { Thread.sleep(500); - } catch (InterruptedException e) - { + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } } errorexception.printStackTrace(); errorexception.printStackTrace(System.out); - return Error; + return ERROR; } /** @@ -386,8 +387,8 @@ public class CassandraClient7 extends DB * A HashMap of field/value pairs to update in the record * @return Zero on success, a non-zero error code on error */ - public int update(String table, String key, HashMap<String, ByteIterator> values) - { + public int update(String table, String key, + HashMap<String, ByteIterator> values) { return insert(table, key, values); } @@ -404,37 +405,30 @@ public class CassandraClient7 extends DB * A HashMap of field/value pairs to insert in the record * @return Zero on success, a non-zero error code on error */ - public int insert(String table, String key, HashMap<String, ByteIterator> values) - { - if (!_table.equals(table)) { - try - { + public int insert(String table, String key, + HashMap<String, ByteIterator> values) { + if (!tableName.equals(table)) { + try { client.set_keyspace(table); - _table = table; - } - catch (Exception e) - { + tableName = table; + } catch (Exception e) { e.printStackTrace(); e.printStackTrace(System.out); - return Error; + return ERROR; } } - - for (int i = 0; i < OperationRetries; i++) - { - if (_debug) - { + + for (int i = 0; i < operationRetries; i++) { + if (debug) { System.out.println("Inserting key: " + key); } - - try - { + + try { ByteBuffer wrappedKey = ByteBuffer.wrap(key.getBytes("UTF-8")); Column col; ColumnOrSuperColumn column; - for (Map.Entry<String, ByteIterator> entry : values.entrySet()) - { + for (Map.Entry<String, ByteIterator> entry : values.entrySet()) { col = new Column(); col.setName(ByteBuffer.wrap(entry.getKey().getBytes("UTF-8"))); col.setValue(ByteBuffer.wrap(entry.getValue().toArray())); @@ -442,35 +436,33 @@ public class CassandraClient7 extends DB column = new ColumnOrSuperColumn(); column.setColumn(col); - + mutations.add(new Mutation().setColumn_or_supercolumn(column)); } - - mutationMap.put(column_family, mutations); + + mutationMap.put(columnFamily, mutations); record.put(wrappedKey, mutationMap); client.batch_mutate(record, ConsistencyLevel.ONE); - + mutations.clear(); mutationMap.clear(); record.clear(); - return Ok; - } catch (Exception e) - { + return OK; + } catch (Exception e) { errorexception = e; } - try - { + try { Thread.sleep(500); - } catch (InterruptedException e) - { + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } } errorexception.printStackTrace(); errorexception.printStackTrace(System.out); - return Error; + return ERROR; } /** @@ -482,55 +474,44 @@ public class CassandraClient7 extends DB * The record key of the record to delete. * @return Zero on success, a non-zero error code on error */ - public int delete(String table, String key) - { - if (!_table.equals(table)) { - try - { + public int delete(String table, String key) { + if (!tableName.equals(table)) { + try { client.set_keyspace(table); - _table = table; - } - catch (Exception e) - { + tableName = table; + } catch (Exception e) { e.printStackTrace(); e.printStackTrace(System.out); - return Error; + return ERROR; } } - for (int i = 0; i < OperationRetries; i++) - { - try - { - client.remove(ByteBuffer.wrap(key.getBytes("UTF-8")), - new ColumnPath(column_family), - System.currentTimeMillis(), - ConsistencyLevel.ONE); - - if (_debug) - { + for (int i = 0; i < operationRetries; i++) { + try { + client.remove(ByteBuffer.wrap(key.getBytes("UTF-8")), + new ColumnPath(columnFamily), System.currentTimeMillis(), + ConsistencyLevel.ONE); + + if (debug) { System.out.println("Delete key: " + key); } - return Ok; - } catch (Exception e) - { + return OK; + } catch (Exception e) { errorexception = e; } - try - { + try { Thread.sleep(500); - } catch (InterruptedException e) - { + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } } errorexception.printStackTrace(); errorexception.printStackTrace(System.out); - return Error; + return ERROR; } - public static void main(String[] args) - { + public static void main(String[] args) { CassandraClient7 cli = new CassandraClient7(); Properties props = new Properties(); @@ -538,11 +519,9 @@ public class CassandraClient7 extends DB props.setProperty("hosts", args[0]); cli.setProperties(props); - try - { + try { cli.init(); - } catch (Exception e) - { + } catch (Exception e) { e.printStackTrace(); System.exit(0); } @@ -561,59 +540,11 @@ public class CassandraClient7 extends DB fields.add("favoritecolor"); res = cli.read("usertable", "BrianFrankCooper", null, result); System.out.println("Result of read: " + res); - for (String s : result.keySet()) - { + for (String s : result.keySet()) { System.out.println("[" + s + "]=[" + result.get(s) + "]"); } res = cli.delete("usertable", "BrianFrankCooper"); System.out.println("Result of delete: " + res); } - - /* - * public static void main(String[] args) throws TException, - * InvalidRequestException, UnavailableException, - * UnsupportedEncodingException, NotFoundException { - * - * - * - * String key_user_id = "1"; - * - * - * - * - * client.insert("Keyspace1", key_user_id, new ColumnPath("Standard1", null, - * "age".getBytes("UTF-8")), "24".getBytes("UTF-8"), timestamp, - * ConsistencyLevel.ONE); - * - * - * // read single column ColumnPath path = new ColumnPath("Standard1", null, - * "name".getBytes("UTF-8")); - * - * System.out.println(client.get("Keyspace1", key_user_id, path, - * ConsistencyLevel.ONE)); - * - * - * // read entire row SlicePredicate predicate = new SlicePredicate(null, new - * SliceRange(new byte[0], new byte[0], false, 10)); - * - * ColumnParent parent = new ColumnParent("Standard1", null); - * - * List<ColumnOrSuperColumn> results = client.get_slice("Keyspace1", - * key_user_id, parent, predicate, ConsistencyLevel.ONE); - * - * for (ColumnOrSuperColumn result : results) { - * - * Column column = result.column; - * - * System.out.println(new String(column.name, "UTF-8") + " -> " + new - * String(column.value, "UTF-8")); - * - * } - * - * - * - * - * } - */ } diff --git a/cassandra/src/main/java/com/yahoo/ycsb/db/CassandraClient8.java b/cassandra/src/main/java/com/yahoo/ycsb/db/CassandraClient8.java index ec8abf3234eea3aec767789ba09b0275e0a6bdf5..afcaf283fe93df832cc20d09f6b3a7b84eeee83d 100644 --- a/cassandra/src/main/java/com/yahoo/ycsb/db/CassandraClient8.java +++ b/cassandra/src/main/java/com/yahoo/ycsb/db/CassandraClient8.java @@ -26,7 +26,6 @@ import java.util.Set; import java.util.HashMap; import java.util.HashSet; import java.util.Vector; -import java.util.Random; import java.util.Properties; import java.nio.ByteBuffer; @@ -37,26 +36,22 @@ import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.cassandra.thrift.*; - //XXXX if we do replication, fix the consistency levels /** - * Cassandra 0.8 client for YCSB framework + * Cassandra 0.8 client for YCSB framework. */ -public class CassandraClient8 extends DB -{ - static Random random = new Random(); - public static final int Ok = 0; - public static final int Error = -1; - public static final ByteBuffer emptyByteBuffer = ByteBuffer.wrap(new byte[0]); - - public int ConnectionRetries; - public int OperationRetries; - public String column_family; - - public static final String CONNECTION_RETRY_PROPERTY = "cassandra.connectionretries"; +public class CassandraClient8 extends DB { + public static final int OK = 0; + public static final int ERROR = -1; + public static final ByteBuffer EMPTY_BYTE_BUFFER = + ByteBuffer.wrap(new byte[0]); + + public static final String CONNECTION_RETRY_PROPERTY = + "cassandra.connectionretries"; public static final String CONNECTION_RETRY_PROPERTY_DEFAULT = "300"; - public static final String OPERATION_RETRY_PROPERTY = "cassandra.operationretries"; + public static final String OPERATION_RETRY_PROPERTY = + "cassandra.operationretries"; public static final String OPERATION_RETRY_PROPERTY_DEFAULT = "300"; public static final String USERNAME_PROPERTY = "cassandra.username"; @@ -65,94 +60,93 @@ public class CassandraClient8 extends DB public static final String COLUMN_FAMILY_PROPERTY = "cassandra.columnfamily"; public static final String COLUMN_FAMILY_PROPERTY_DEFAULT = "data"; - TTransport tr; - Cassandra.Client client; + private int connectionRetries; + private int operationRetries; + private String columnFamily; + + private TTransport tr; + private Cassandra.Client client; + + private boolean debug = false; + + private String tableName = ""; + private Exception errorexception = null; - boolean _debug = false; - - String _table = ""; - Exception errorexception = null; - - List<Mutation> mutations = new ArrayList<Mutation>(); - Map<String, List<Mutation>> mutationMap = new HashMap<String, List<Mutation>>(); - Map<ByteBuffer, Map<String, List<Mutation>>> record = new HashMap<ByteBuffer, Map<String, List<Mutation>>>(); - - ColumnParent parent; + private List<Mutation> mutations = new ArrayList<Mutation>(); + private Map<String, List<Mutation>> mutationMap = + new HashMap<String, List<Mutation>>(); + private Map<ByteBuffer, Map<String, List<Mutation>>> record = + new HashMap<ByteBuffer, Map<String, List<Mutation>>>(); + + private ColumnParent parent; /** * Initialize any state for this DB. Called once per DB instance; there is one * DB instance per client thread. */ - public void init() throws DBException - { + public void init() throws DBException { String hosts = getProperties().getProperty("hosts"); - if (hosts == null) - { - throw new DBException("Required property \"hosts\" missing for CassandraClient"); + if (hosts == null) { + throw new DBException( + "Required property \"hosts\" missing for CassandraClient"); } - column_family = getProperties().getProperty(COLUMN_FAMILY_PROPERTY, COLUMN_FAMILY_PROPERTY_DEFAULT); - parent = new ColumnParent(column_family); + columnFamily = getProperties().getProperty(COLUMN_FAMILY_PROPERTY, + COLUMN_FAMILY_PROPERTY_DEFAULT); + parent = new ColumnParent(columnFamily); - ConnectionRetries = Integer.parseInt(getProperties().getProperty(CONNECTION_RETRY_PROPERTY, - CONNECTION_RETRY_PROPERTY_DEFAULT)); - OperationRetries = Integer.parseInt(getProperties().getProperty(OPERATION_RETRY_PROPERTY, - OPERATION_RETRY_PROPERTY_DEFAULT)); + connectionRetries = + Integer.parseInt(getProperties().getProperty(CONNECTION_RETRY_PROPERTY, + CONNECTION_RETRY_PROPERTY_DEFAULT)); + operationRetries = + Integer.parseInt(getProperties().getProperty(OPERATION_RETRY_PROPERTY, + OPERATION_RETRY_PROPERTY_DEFAULT)); String username = getProperties().getProperty(USERNAME_PROPERTY); String password = getProperties().getProperty(PASSWORD_PROPERTY); - _debug = Boolean.parseBoolean(getProperties().getProperty("debug", "false")); + debug = Boolean.parseBoolean(getProperties().getProperty("debug", "false")); String[] allhosts = hosts.split(","); - String myhost = allhosts[random.nextInt(allhosts.length)]; + String myhost = allhosts[Utils.random().nextInt(allhosts.length)]; Exception connectexception = null; - for (int retry = 0; retry < ConnectionRetries; retry++) - { + for (int retry = 0; retry < connectionRetries; retry++) { tr = new TFramedTransport(new TSocket(myhost, 9160)); TProtocol proto = new TBinaryProtocol(tr); client = new Cassandra.Client(proto); - try - { + try { tr.open(); connectexception = null; break; - } catch (Exception e) - { + } catch (Exception e) { connectexception = e; } - try - { + try { Thread.sleep(1000); - } catch (InterruptedException e) - { + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } } - if (connectexception != null) - { - System.err.println("Unable to connect to " + myhost + " after " + ConnectionRetries - + " tries"); - System.out.println("Unable to connect to " + myhost + " after " + ConnectionRetries - + " tries"); + if (connectexception != null) { + System.err.println("Unable to connect to " + myhost + " after " + + connectionRetries + " tries"); + System.out.println("Unable to connect to " + myhost + " after " + + connectionRetries + " tries"); throw new DBException(connectexception); } - if (username != null && password != null) - { - Map<String,String> cred = new HashMap<String,String>(); - cred.put("username", username); - cred.put("password", password); - AuthenticationRequest req = new AuthenticationRequest(cred); - try - { - client.login(req); - } - catch (Exception e) - { - throw new DBException(e); - } + if (username != null && password != null) { + Map<String, String> cred = new HashMap<String, String>(); + cred.put("username", username); + cred.put("password", password); + AuthenticationRequest req = new AuthenticationRequest(cred); + try { + client.login(req); + } catch (Exception e) { + throw new DBException(e); + } } } @@ -160,8 +154,7 @@ public class CassandraClient8 extends DB * Cleanup any state for this DB. Called once per DB instance; there is one DB * instance per client thread. */ - public void cleanup() throws DBException - { + public void cleanup() throws DBException { tr.close(); } @@ -179,88 +172,83 @@ public class CassandraClient8 extends DB * A HashMap of field/value pairs for the result * @return Zero on success, a non-zero error code on error */ - public int read(String table, String key, Set<String> fields, HashMap<String, ByteIterator> result) - { - if (!_table.equals(table)) { - try - { + public int read(String table, String key, Set<String> fields, + HashMap<String, ByteIterator> result) { + if (!tableName.equals(table)) { + try { client.set_keyspace(table); - _table = table; - } - catch (Exception e) - { + tableName = table; + } catch (Exception e) { e.printStackTrace(); e.printStackTrace(System.out); - return Error; + return ERROR; } } - for (int i = 0; i < OperationRetries; i++) - { + for (int i = 0; i < operationRetries; i++) { - try - { + try { SlicePredicate predicate; - if (fields == null) - { - predicate = new SlicePredicate().setSlice_range(new SliceRange(emptyByteBuffer, emptyByteBuffer, false, 1000000)); - + if (fields == null) { + predicate = new SlicePredicate().setSlice_range(new SliceRange( + EMPTY_BYTE_BUFFER, EMPTY_BYTE_BUFFER, false, 1000000)); + } else { - ArrayList<ByteBuffer> fieldlist = new ArrayList<ByteBuffer>(fields.size()); - for (String s : fields) - { + ArrayList<ByteBuffer> fieldlist = + new ArrayList<ByteBuffer>(fields.size()); + for (String s : fields) { fieldlist.add(ByteBuffer.wrap(s.getBytes("UTF-8"))); } predicate = new SlicePredicate().setColumn_names(fieldlist); } - List<ColumnOrSuperColumn> results = client.get_slice(ByteBuffer.wrap(key.getBytes("UTF-8")), parent, predicate, ConsistencyLevel.ONE); + List<ColumnOrSuperColumn> results = + client.get_slice(ByteBuffer.wrap(key.getBytes("UTF-8")), parent, + predicate, ConsistencyLevel.ONE); - if (_debug) - { + if (debug) { System.out.print("Reading key: " + key); } Column column; String name; ByteIterator value; - for (ColumnOrSuperColumn oneresult : results) - { + for (ColumnOrSuperColumn oneresult : results) { column = oneresult.column; - name = new String(column.name.array(), column.name.position()+column.name.arrayOffset(), column.name.remaining()); - value = new ByteArrayByteIterator(column.value.array(), column.value.position()+column.value.arrayOffset(), column.value.remaining()); + name = new String(column.name.array(), + column.name.position() + column.name.arrayOffset(), + column.name.remaining()); + value = new ByteArrayByteIterator(column.value.array(), + column.value.position() + column.value.arrayOffset(), + column.value.remaining()); - result.put(name,value); + result.put(name, value); - if (_debug) - { + if (debug) { System.out.print("(" + name + "=" + value + ")"); } } - if (_debug) - { + if (debug) { System.out.println(); } - return Ok; - } catch (Exception e) - { + return OK; + } catch (Exception e) { errorexception = e; } - try - { + try { Thread.sleep(500); - } catch (InterruptedException e) - { + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } } errorexception.printStackTrace(); errorexception.printStackTrace(System.out); - return Error; + return ERROR; } @@ -281,96 +269,89 @@ public class CassandraClient8 extends DB * pairs for one record * @return Zero on success, a non-zero error code on error */ - public int scan(String table, String startkey, int recordcount, Set<String> fields, - Vector<HashMap<String, ByteIterator>> result) - { - if (!_table.equals(table)) { - try - { + public int scan(String table, String startkey, int recordcount, + Set<String> fields, Vector<HashMap<String, ByteIterator>> result) { + if (!tableName.equals(table)) { + try { client.set_keyspace(table); - _table = table; - } - catch (Exception e) - { + tableName = table; + } catch (Exception e) { e.printStackTrace(); e.printStackTrace(System.out); - return Error; + return ERROR; } } - - for (int i = 0; i < OperationRetries; i++) - { - try - { + for (int i = 0; i < operationRetries; i++) { + + try { SlicePredicate predicate; - if (fields == null) - { - predicate = new SlicePredicate().setSlice_range(new SliceRange(emptyByteBuffer, emptyByteBuffer, false, 1000000)); - + if (fields == null) { + predicate = new SlicePredicate().setSlice_range(new SliceRange( + EMPTY_BYTE_BUFFER, EMPTY_BYTE_BUFFER, false, 1000000)); + } else { - ArrayList<ByteBuffer> fieldlist = new ArrayList<ByteBuffer>(fields.size()); - for (String s : fields) - { - fieldlist.add(ByteBuffer.wrap(s.getBytes("UTF-8"))); + ArrayList<ByteBuffer> fieldlist = + new ArrayList<ByteBuffer>(fields.size()); + for (String s : fields) { + fieldlist.add(ByteBuffer.wrap(s.getBytes("UTF-8"))); } - + predicate = new SlicePredicate().setColumn_names(fieldlist); } - - KeyRange kr = new KeyRange().setStart_key(startkey.getBytes("UTF-8")).setEnd_key(new byte[] {}).setCount(recordcount); - List<KeySlice> results = client.get_range_slices(parent, predicate, kr, ConsistencyLevel.ONE); + KeyRange kr = new KeyRange().setStart_key(startkey.getBytes("UTF-8")) + .setEnd_key(new byte[] {}).setCount(recordcount); + + List<KeySlice> results = client.get_range_slices(parent, predicate, kr, + ConsistencyLevel.ONE); - if (_debug) - { + if (debug) { System.out.println("Scanning startkey: " + startkey); } HashMap<String, ByteIterator> tuple; - for (KeySlice oneresult : results) - { + for (KeySlice oneresult : results) { tuple = new HashMap<String, ByteIterator>(); - + Column column; String name; ByteIterator value; - for (ColumnOrSuperColumn onecol : oneresult.columns) - { - column = onecol.column; - name = new String(column.name.array(), column.name.position()+column.name.arrayOffset(), column.name.remaining()); - value = new ByteArrayByteIterator(column.value.array(), column.value.position()+column.value.arrayOffset(), column.value.remaining()); - - tuple.put(name, value); - - if (_debug) - { + for (ColumnOrSuperColumn onecol : oneresult.columns) { + column = onecol.column; + name = new String(column.name.array(), + column.name.position() + column.name.arrayOffset(), + column.name.remaining()); + value = new ByteArrayByteIterator(column.value.array(), + column.value.position() + column.value.arrayOffset(), + column.value.remaining()); + + tuple.put(name, value); + + if (debug) { System.out.print("(" + name + "=" + value + ")"); } } result.add(tuple); - if (_debug) - { + if (debug) { System.out.println(); } } - return Ok; - } catch (Exception e) - { + return OK; + } catch (Exception e) { errorexception = e; } - try - { + try { Thread.sleep(500); - } catch (InterruptedException e) - { + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } } errorexception.printStackTrace(); errorexception.printStackTrace(System.out); - return Error; + return ERROR; } /** @@ -386,8 +367,8 @@ public class CassandraClient8 extends DB * A HashMap of field/value pairs to update in the record * @return Zero on success, a non-zero error code on error */ - public int update(String table, String key, HashMap<String, ByteIterator> values) - { + public int update(String table, String key, + HashMap<String, ByteIterator> values) { return insert(table, key, values); } @@ -404,37 +385,30 @@ public class CassandraClient8 extends DB * A HashMap of field/value pairs to insert in the record * @return Zero on success, a non-zero error code on error */ - public int insert(String table, String key, HashMap<String, ByteIterator> values) - { - if (!_table.equals(table)) { - try - { + public int insert(String table, String key, + HashMap<String, ByteIterator> values) { + if (!tableName.equals(table)) { + try { client.set_keyspace(table); - _table = table; - } - catch (Exception e) - { + tableName = table; + } catch (Exception e) { e.printStackTrace(); e.printStackTrace(System.out); - return Error; + return ERROR; } } - - for (int i = 0; i < OperationRetries; i++) - { - if (_debug) - { + + for (int i = 0; i < operationRetries; i++) { + if (debug) { System.out.println("Inserting key: " + key); } - - try - { + + try { ByteBuffer wrappedKey = ByteBuffer.wrap(key.getBytes("UTF-8")); Column col; ColumnOrSuperColumn column; - for (Map.Entry<String, ByteIterator> entry : values.entrySet()) - { + for (Map.Entry<String, ByteIterator> entry : values.entrySet()) { col = new Column(); col.setName(ByteBuffer.wrap(entry.getKey().getBytes("UTF-8"))); col.setValue(ByteBuffer.wrap(entry.getValue().toArray())); @@ -442,35 +416,33 @@ public class CassandraClient8 extends DB column = new ColumnOrSuperColumn(); column.setColumn(col); - + mutations.add(new Mutation().setColumn_or_supercolumn(column)); } - - mutationMap.put(column_family, mutations); + + mutationMap.put(columnFamily, mutations); record.put(wrappedKey, mutationMap); client.batch_mutate(record, ConsistencyLevel.ONE); - + mutations.clear(); mutationMap.clear(); record.clear(); - return Ok; - } catch (Exception e) - { + return OK; + } catch (Exception e) { errorexception = e; } - try - { + try { Thread.sleep(500); - } catch (InterruptedException e) - { + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } } errorexception.printStackTrace(); errorexception.printStackTrace(System.out); - return Error; + return ERROR; } /** @@ -482,55 +454,44 @@ public class CassandraClient8 extends DB * The record key of the record to delete. * @return Zero on success, a non-zero error code on error */ - public int delete(String table, String key) - { - if (!_table.equals(table)) { - try - { + public int delete(String table, String key) { + if (!tableName.equals(table)) { + try { client.set_keyspace(table); - _table = table; - } - catch (Exception e) - { + tableName = table; + } catch (Exception e) { e.printStackTrace(); e.printStackTrace(System.out); - return Error; + return ERROR; } } - for (int i = 0; i < OperationRetries; i++) - { - try - { - client.remove(ByteBuffer.wrap(key.getBytes("UTF-8")), - new ColumnPath(column_family), - System.currentTimeMillis(), - ConsistencyLevel.ONE); - - if (_debug) - { + for (int i = 0; i < operationRetries; i++) { + try { + client.remove(ByteBuffer.wrap(key.getBytes("UTF-8")), + new ColumnPath(columnFamily), System.currentTimeMillis(), + ConsistencyLevel.ONE); + + if (debug) { System.out.println("Delete key: " + key); } - return Ok; - } catch (Exception e) - { + return OK; + } catch (Exception e) { errorexception = e; } - try - { + try { Thread.sleep(500); - } catch (InterruptedException e) - { + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } } errorexception.printStackTrace(); errorexception.printStackTrace(System.out); - return Error; + return ERROR; } - public static void main(String[] args) - { + public static void main(String[] args) { CassandraClient8 cli = new CassandraClient8(); Properties props = new Properties(); @@ -538,11 +499,9 @@ public class CassandraClient8 extends DB props.setProperty("hosts", args[0]); cli.setProperties(props); - try - { + try { cli.init(); - } catch (Exception e) - { + } catch (Exception e) { e.printStackTrace(); System.exit(0); } @@ -561,59 +520,11 @@ public class CassandraClient8 extends DB fields.add("favoritecolor"); res = cli.read("usertable", "BrianFrankCooper", null, result); System.out.println("Result of read: " + res); - for (String s : result.keySet()) - { + for (String s : result.keySet()) { System.out.println("[" + s + "]=[" + result.get(s) + "]"); } res = cli.delete("usertable", "BrianFrankCooper"); System.out.println("Result of delete: " + res); } - - /* - * public static void main(String[] args) throws TException, - * InvalidRequestException, UnavailableException, - * UnsupportedEncodingException, NotFoundException { - * - * - * - * String key_user_id = "1"; - * - * - * - * - * client.insert("Keyspace1", key_user_id, new ColumnPath("Standard1", null, - * "age".getBytes("UTF-8")), "24".getBytes("UTF-8"), timestamp, - * ConsistencyLevel.ONE); - * - * - * // read single column ColumnPath path = new ColumnPath("Standard1", null, - * "name".getBytes("UTF-8")); - * - * System.out.println(client.get("Keyspace1", key_user_id, path, - * ConsistencyLevel.ONE)); - * - * - * // read entire row SlicePredicate predicate = new SlicePredicate(null, new - * SliceRange(new byte[0], new byte[0], false, 10)); - * - * ColumnParent parent = new ColumnParent("Standard1", null); - * - * List<ColumnOrSuperColumn> results = client.get_slice("Keyspace1", - * key_user_id, parent, predicate, ConsistencyLevel.ONE); - * - * for (ColumnOrSuperColumn result : results) { - * - * Column column = result.column; - * - * System.out.println(new String(column.name, "UTF-8") + " -> " + new - * String(column.value, "UTF-8")); - * - * } - * - * - * - * - * } - */ } diff --git a/cassandra/src/main/java/com/yahoo/ycsb/db/package-info.java b/cassandra/src/main/java/com/yahoo/ycsb/db/package-info.java new file mode 100644 index 0000000000000000000000000000000000000000..88ce1f0111a374f41bf98efe0f5350beb6b9b140 --- /dev/null +++ b/cassandra/src/main/java/com/yahoo/ycsb/db/package-info.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2014, Yahoo!, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +/** + * The YCSB binding for <a href="http://cassandra.apache.org/">Cassandra</a> + * versions 0.7, 0.8, and 1.0.X. + */ +package com.yahoo.ycsb.db; + diff --git a/cassandra2/pom.xml b/cassandra2/pom.xml index 7ad8132d61d5f83ab5939efa9eb521dbc889325a..e247c2f6b1b5b91fba7113952c65d36456a20afe 100644 --- a/cassandra2/pom.xml +++ b/cassandra2/pom.xml @@ -56,6 +56,29 @@ LICENSE file. <version>4.12</version> <scope>test</scope> </dependency> - </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + <version>2.15</version> + <configuration> + <consoleOutput>true</consoleOutput> + <configLocation>../checkstyle.xml</configLocation> + <failOnViolation>true</failOnViolation> + <failsOnError>true</failsOnError> + </configuration> + <executions> + <execution> + <id>validate</id> + <phase>validate</phase> + <goals> + <goal>checkstyle</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> </project> diff --git a/cassandra2/src/main/java/com/yahoo/ycsb/db/CassandraCQLClient.java b/cassandra2/src/main/java/com/yahoo/ycsb/db/CassandraCQLClient.java index 0fa2b3014fb9e34e8aa0655051a779b0daa02408..eca2a6779fec791e7bc4313c5e41312bfda9e073 100644 --- a/cassandra2/src/main/java/com/yahoo/ycsb/db/CassandraCQLClient.java +++ b/cassandra2/src/main/java/com/yahoo/ycsb/db/CassandraCQLClient.java @@ -43,7 +43,6 @@ import java.util.Set; import java.util.Vector; import java.util.concurrent.atomic.AtomicInteger; - /** * Cassandra 2.x CQL client. * @@ -53,381 +52,411 @@ import java.util.concurrent.atomic.AtomicInteger; */ public class CassandraCQLClient extends DB { - protected static Cluster cluster = null; - protected static Session session = null; - - private static ConsistencyLevel readConsistencyLevel = ConsistencyLevel.ONE; - private static ConsistencyLevel writeConsistencyLevel = ConsistencyLevel.ONE; + private static Cluster cluster = null; + private static Session session = null; + + private static ConsistencyLevel readConsistencyLevel = ConsistencyLevel.ONE; + private static ConsistencyLevel writeConsistencyLevel = ConsistencyLevel.ONE; + + public static final int OK = 0; + public static final int ERR = -1; + public static final int NOT_FOUND = -3; + + public static final String YCSB_KEY = "y_id"; + public static final String KEYSPACE_PROPERTY = "cassandra.keyspace"; + public static final String KEYSPACE_PROPERTY_DEFAULT = "ycsb"; + public static final String USERNAME_PROPERTY = "cassandra.username"; + public static final String PASSWORD_PROPERTY = "cassandra.password"; + + public static final String HOSTS_PROPERTY = "hosts"; + public static final String PORT_PROPERTY = "port"; + + public static final String READ_CONSISTENCY_LEVEL_PROPERTY = + "cassandra.readconsistencylevel"; + public static final String READ_CONSISTENCY_LEVEL_PROPERTY_DEFAULT = "ONE"; + public static final String WRITE_CONSISTENCY_LEVEL_PROPERTY = + "cassandra.writeconsistencylevel"; + public static final String WRITE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT = "ONE"; + + /** + * Count the number of times initialized to teardown on the last + * {@link #cleanup()}. + */ + private static final AtomicInteger INIT_COUNT = new AtomicInteger(0); + + private static boolean debug = false; + + /** + * Initialize any state for this DB. Called once per DB instance; there is one + * DB instance per client thread. + */ + @Override + public void init() throws DBException { + + // Keep track of number of calls to init (for later cleanup) + INIT_COUNT.incrementAndGet(); + + // Synchronized so that we only have a single + // cluster/session instance for all the threads. + synchronized (INIT_COUNT) { + + // Check if the cluster has already been initialized + if (cluster != null) { + return; + } - public static final int OK = 0; - public static final int ERR = -1; - public static final int NOT_FOUND = -3; - - public static final String YCSB_KEY = "y_id"; - public static final String KEYSPACE_PROPERTY = "cassandra.keyspace"; - public static final String KEYSPACE_PROPERTY_DEFAULT = "ycsb"; - public static final String USERNAME_PROPERTY = "cassandra.username"; - public static final String PASSWORD_PROPERTY = "cassandra.password"; - - public static final String HOSTS_PROPERTY = "hosts"; - public static final String PORT_PROPERTY = "port"; - - - public static final String READ_CONSISTENCY_LEVEL_PROPERTY = "cassandra.readconsistencylevel"; - public static final String READ_CONSISTENCY_LEVEL_PROPERTY_DEFAULT = "ONE"; - public static final String WRITE_CONSISTENCY_LEVEL_PROPERTY = "cassandra.writeconsistencylevel"; - public static final String WRITE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT = "ONE"; - - /** Count the number of times initialized to teardown on the last {@link #cleanup()}. */ - private static final AtomicInteger initCount = new AtomicInteger(0); - - private static boolean _debug = false; - - /** - * Initialize any state for this DB. Called once per DB instance; there is - * one DB instance per client thread. - */ - @Override - public void init() throws DBException { - - //Keep track of number of calls to init (for later cleanup) - initCount.incrementAndGet(); - - //Synchronized so that we only have a single - // cluster/session instance for all the threads. - synchronized (initCount) { - - //Check if the cluster has already been initialized - if (cluster != null) { - return; - } - - try { - - _debug = Boolean.parseBoolean(getProperties().getProperty("debug", "false")); - - String host = getProperties().getProperty(HOSTS_PROPERTY); - if (host == null) { - throw new DBException(String.format("Required property \"%s\" missing for CassandraCQLClient", HOSTS_PROPERTY)); - } - String hosts[] = host.split(","); - String port = getProperties().getProperty("port", "9042"); - if (port == null) { - throw new DBException(String.format("Required property \"%s\" missing for CassandraCQLClient", PORT_PROPERTY)); - } - - String username = getProperties().getProperty(USERNAME_PROPERTY); - String password = getProperties().getProperty(PASSWORD_PROPERTY); - - String keyspace = getProperties().getProperty(KEYSPACE_PROPERTY, KEYSPACE_PROPERTY_DEFAULT); - - readConsistencyLevel = ConsistencyLevel.valueOf(getProperties().getProperty(READ_CONSISTENCY_LEVEL_PROPERTY, READ_CONSISTENCY_LEVEL_PROPERTY_DEFAULT)); - writeConsistencyLevel = ConsistencyLevel.valueOf(getProperties().getProperty(WRITE_CONSISTENCY_LEVEL_PROPERTY, WRITE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT)); - - // public void connect(String node) {} - if ((username != null) && !username.isEmpty()) { - cluster = Cluster.builder() - .withCredentials(username, password) - .withPort(Integer.valueOf(port)) - .addContactPoints(hosts).build(); - } - else { - cluster = Cluster.builder() - .withPort(Integer.valueOf(port)) - .addContactPoints(hosts).build(); - } - - //Update number of connections based on threads - int threadcount = Integer.parseInt(getProperties().getProperty("threadcount","1")); - cluster.getConfiguration().getPoolingOptions().setMaxConnectionsPerHost(HostDistance.LOCAL, threadcount); - - //Set connection timeout 3min (default is 5s) - cluster.getConfiguration().getSocketOptions().setConnectTimeoutMillis(3*60*1000); - //Set read (execute) timeout 3min (default is 12s) - cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(3*60*1000); - - Metadata metadata = cluster.getMetadata(); - System.err.printf("Connected to cluster: %s\n", metadata.getClusterName()); - - for (Host discoveredHost : metadata.getAllHosts()) { - System.out.printf("Datacenter: %s; Host: %s; Rack: %s\n", - discoveredHost.getDatacenter(), - discoveredHost.getAddress(), - discoveredHost.getRack()); - } - - session = cluster.connect(keyspace); - - } catch (Exception e) { - throw new DBException(e); - } - }//synchronized - } + try { + + debug = + Boolean.parseBoolean(getProperties().getProperty("debug", "false")); + + String host = getProperties().getProperty(HOSTS_PROPERTY); + if (host == null) { + throw new DBException(String.format( + "Required property \"%s\" missing for CassandraCQLClient", + HOSTS_PROPERTY)); + } + String[] hosts = host.split(","); + String port = getProperties().getProperty("port", "9042"); + if (port == null) { + throw new DBException(String.format( + "Required property \"%s\" missing for CassandraCQLClient", + PORT_PROPERTY)); + } - /** - * Cleanup any state for this DB. Called once per DB instance; there is one - * DB instance per client thread. - */ - @Override - public void cleanup() throws DBException { - synchronized(initCount) { - final int curInitCount = initCount.decrementAndGet(); - if (curInitCount <= 0) { - session.close(); - cluster.close(); - cluster = null; - session = null; + String username = getProperties().getProperty(USERNAME_PROPERTY); + String password = getProperties().getProperty(PASSWORD_PROPERTY); + + String keyspace = getProperties().getProperty(KEYSPACE_PROPERTY, + KEYSPACE_PROPERTY_DEFAULT); + + readConsistencyLevel = ConsistencyLevel.valueOf( + getProperties().getProperty(READ_CONSISTENCY_LEVEL_PROPERTY, + READ_CONSISTENCY_LEVEL_PROPERTY_DEFAULT)); + writeConsistencyLevel = ConsistencyLevel.valueOf( + getProperties().getProperty(WRITE_CONSISTENCY_LEVEL_PROPERTY, + WRITE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT)); + + // public void connect(String node) {} + if ((username != null) && !username.isEmpty()) { + cluster = Cluster.builder().withCredentials(username, password) + .withPort(Integer.valueOf(port)).addContactPoints(hosts).build(); + } else { + cluster = Cluster.builder().withPort(Integer.valueOf(port)) + .addContactPoints(hosts).build(); } - if (curInitCount < 0) { - // This should never happen. - throw new DBException( - String.format("initCount is negative: %d", curInitCount)); + + // Update number of connections based on threads + int threadcount = + Integer.parseInt(getProperties().getProperty("threadcount", "1")); + cluster.getConfiguration().getPoolingOptions() + .setMaxConnectionsPerHost(HostDistance.LOCAL, threadcount); + + // Set connection timeout 3min (default is 5s) + cluster.getConfiguration().getSocketOptions() + .setConnectTimeoutMillis(3 * 60 * 1000); + // Set read (execute) timeout 3min (default is 12s) + cluster.getConfiguration().getSocketOptions() + .setReadTimeoutMillis(3 * 60 * 1000); + + Metadata metadata = cluster.getMetadata(); + System.err.printf("Connected to cluster: %s\n", + metadata.getClusterName()); + + for (Host discoveredHost : metadata.getAllHosts()) { + System.out.printf("Datacenter: %s; Host: %s; Rack: %s\n", + discoveredHost.getDatacenter(), discoveredHost.getAddress(), + discoveredHost.getRack()); } + + session = cluster.connect(keyspace); + + } catch (Exception e) { + throw new DBException(e); + } + } // synchronized + } + + /** + * Cleanup any state for this DB. Called once per DB instance; there is one DB + * instance per client thread. + */ + @Override + public void cleanup() throws DBException { + synchronized (INIT_COUNT) { + final int curInitCount = INIT_COUNT.decrementAndGet(); + if (curInitCount <= 0) { + session.close(); + cluster.close(); + cluster = null; + session = null; + } + if (curInitCount < 0) { + // This should never happen. + throw new DBException( + String.format("initCount is negative: %d", curInitCount)); } } + } + + /** + * Read a record from the database. Each field/value pair from the result will + * be stored in a HashMap. + * + * @param table + * The name of the table + * @param key + * The record key of the record to read. + * @param fields + * The list of fields to read, or null for all of them + * @param result + * A HashMap of field/value pairs for the result + * @return Zero on success, a non-zero error code on error + */ + @Override + public int read(String table, String key, Set<String> fields, + HashMap<String, ByteIterator> result) { + try { + Statement stmt; + Select.Builder selectBuilder; + + if (fields == null) { + selectBuilder = QueryBuilder.select().all(); + } else { + selectBuilder = QueryBuilder.select(); + for (String col : fields) { + ((Select.Selection) selectBuilder).column(col); + } + } + + stmt = selectBuilder.from(table).where(QueryBuilder.eq(YCSB_KEY, key)) + .limit(1); + stmt.setConsistencyLevel(readConsistencyLevel); + + if (debug) { + System.out.println(stmt.toString()); + } + + ResultSet rs = session.execute(stmt); + + if (rs.isExhausted()) { + return NOT_FOUND; + } - /** - * Read a record from the database. Each field/value pair from the result - * will be stored in a HashMap. - * - * @param table The name of the table - * @param key The record key of the record to read. - * @param fields The list of fields to read, or null for all of them - * @param result A HashMap of field/value pairs for the result - * @return Zero on success, a non-zero error code on error - */ - @Override - public int read(String table, String key, Set<String> fields, HashMap<String, ByteIterator> result) { - try { - Statement stmt; - Select.Builder selectBuilder; - - if (fields == null) { - selectBuilder = QueryBuilder.select().all(); - } - else { - selectBuilder = QueryBuilder.select(); - for (String col : fields) { - ((Select.Selection) selectBuilder).column(col); - } - } - - stmt = selectBuilder.from(table).where(QueryBuilder.eq(YCSB_KEY, key)).limit(1); - stmt.setConsistencyLevel(readConsistencyLevel); - - if (_debug) { - System.out.println(stmt.toString()); - } - - ResultSet rs = session.execute(stmt); - - if (rs.isExhausted()) { - return NOT_FOUND; - } - - //Should be only 1 row - Row row = rs.one(); - ColumnDefinitions cd = row.getColumnDefinitions(); - - for (ColumnDefinitions.Definition def : cd) { - ByteBuffer val = row.getBytesUnsafe(def.getName()); - if (val != null) { - result.put(def.getName(), - new ByteArrayByteIterator(val.array())); - } - else { - result.put(def.getName(), null); - } - } - - return OK; - - } catch (Exception e) { - e.printStackTrace(); - System.out.println("Error reading key: " + key); - return ERR; + // Should be only 1 row + Row row = rs.one(); + ColumnDefinitions cd = row.getColumnDefinitions(); + + for (ColumnDefinitions.Definition def : cd) { + ByteBuffer val = row.getBytesUnsafe(def.getName()); + if (val != null) { + result.put(def.getName(), new ByteArrayByteIterator(val.array())); + } else { + result.put(def.getName(), null); } + } + + return OK; + } catch (Exception e) { + e.printStackTrace(); + System.out.println("Error reading key: " + key); + return ERR; } - /** - * Perform a range scan for a set of records in the database. Each - * field/value pair from the result will be stored in a HashMap. - * - * Cassandra CQL uses "token" method for range scan which doesn't always - * yield intuitive results. - * - * @param table The name of the table - * @param startkey The record key of the first record to read. - * @param recordcount The number of records to read - * @param fields The list of fields to read, or null for all of them - * @param result A Vector of HashMaps, where each HashMap is a set - * field/value pairs for one record - * @return Zero on success, a non-zero error code on error - */ - @Override - public int scan(String table, String startkey, int recordcount, Set<String> fields, Vector<HashMap<String, ByteIterator>> result) { - - try { - Statement stmt; - Select.Builder selectBuilder; - - if (fields == null) { - selectBuilder = QueryBuilder.select().all(); - } - else { - selectBuilder = QueryBuilder.select(); - for (String col : fields) { - ((Select.Selection) selectBuilder).column(col); - } - } - - stmt = selectBuilder.from(table); - - //The statement builder is not setup right for tokens. - // So, we need to build it manually. - String initialStmt = stmt.toString(); - StringBuilder scanStmt = new StringBuilder(); - scanStmt.append( - initialStmt.substring(0, initialStmt.length()-1)); - scanStmt.append(" WHERE "); - scanStmt.append(QueryBuilder.token(YCSB_KEY)); - scanStmt.append(" >= "); - scanStmt.append("token('"); - scanStmt.append(startkey); - scanStmt.append("')"); - scanStmt.append(" LIMIT "); - scanStmt.append(recordcount); - - stmt = new SimpleStatement(scanStmt.toString()); - stmt.setConsistencyLevel(readConsistencyLevel); - - if (_debug) { - System.out.println(stmt.toString()); - } - - ResultSet rs = session.execute(stmt); - - HashMap<String, ByteIterator> tuple; - while (!rs.isExhausted()) { - Row row = rs.one(); - tuple = new HashMap<String, ByteIterator> (); - - ColumnDefinitions cd = row.getColumnDefinitions(); - - for (ColumnDefinitions.Definition def : cd) { - ByteBuffer val = row.getBytesUnsafe(def.getName()); - if (val != null) { - tuple.put(def.getName(), - new ByteArrayByteIterator(val.array())); - } - else { - tuple.put(def.getName(), null); - } - } - - result.add(tuple); - } - - return OK; - - } catch (Exception e) { - e.printStackTrace(); - System.out.println("Error scanning with startkey: " + startkey); - return ERR; + } + + /** + * Perform a range scan for a set of records in the database. Each field/value + * pair from the result will be stored in a HashMap. + * + * Cassandra CQL uses "token" method for range scan which doesn't always yield + * intuitive results. + * + * @param table + * The name of the table + * @param startkey + * The record key of the first record to read. + * @param recordcount + * The number of records to read + * @param fields + * The list of fields to read, or null for all of them + * @param result + * A Vector of HashMaps, where each HashMap is a set field/value + * pairs for one record + * @return Zero on success, a non-zero error code on error + */ + @Override + public int scan(String table, String startkey, int recordcount, + Set<String> fields, Vector<HashMap<String, ByteIterator>> result) { + + try { + Statement stmt; + Select.Builder selectBuilder; + + if (fields == null) { + selectBuilder = QueryBuilder.select().all(); + } else { + selectBuilder = QueryBuilder.select(); + for (String col : fields) { + ((Select.Selection) selectBuilder).column(col); } + } - } + stmt = selectBuilder.from(table); + + // The statement builder is not setup right for tokens. + // So, we need to build it manually. + String initialStmt = stmt.toString(); + StringBuilder scanStmt = new StringBuilder(); + scanStmt.append(initialStmt.substring(0, initialStmt.length() - 1)); + scanStmt.append(" WHERE "); + scanStmt.append(QueryBuilder.token(YCSB_KEY)); + scanStmt.append(" >= "); + scanStmt.append("token('"); + scanStmt.append(startkey); + scanStmt.append("')"); + scanStmt.append(" LIMIT "); + scanStmt.append(recordcount); + + stmt = new SimpleStatement(scanStmt.toString()); + stmt.setConsistencyLevel(readConsistencyLevel); + + if (debug) { + System.out.println(stmt.toString()); + } - /** - * Update a record in the database. Any field/value pairs in the specified - * values HashMap will be written into the record with the specified record - * key, overwriting any existing values with the same field name. - * - * @param table The name of the table - * @param key The record key of the record to write. - * @param values A HashMap of field/value pairs to update in the record - * @return Zero on success, a non-zero error code on error - */ - @Override - public int update(String table, String key, HashMap<String, ByteIterator> values) { - //Insert and updates provide the same functionality - return insert(table, key, values); - } + ResultSet rs = session.execute(stmt); + + HashMap<String, ByteIterator> tuple; + while (!rs.isExhausted()) { + Row row = rs.one(); + tuple = new HashMap<String, ByteIterator>(); - /** - * Insert a record in the database. Any field/value pairs in the specified - * values HashMap will be written into the record with the specified record - * key. - * - * @param table The name of the table - * @param key The record key of the record to insert. - * @param values A HashMap of field/value pairs to insert in the record - * @return Zero on success, a non-zero error code on error - */ - @Override - public int insert(String table, String key, HashMap<String, ByteIterator> values) { - - try { - Insert insertStmt = QueryBuilder.insertInto(table); - - //Add key - insertStmt.value(YCSB_KEY, key); - - //Add fields - for (Map.Entry<String, ByteIterator> entry : values.entrySet()) { - Object value; - ByteIterator byteIterator = entry.getValue(); - value = byteIterator.toString(); - - insertStmt.value(entry.getKey(), value); - } - - insertStmt.setConsistencyLevel(writeConsistencyLevel).enableTracing(); - - if (_debug) { - System.out.println(insertStmt.toString()); - } - - ResultSet rs = session.execute(insertStmt); - - return OK; - } catch (Exception e) { - e.printStackTrace(); + ColumnDefinitions cd = row.getColumnDefinitions(); + + for (ColumnDefinitions.Definition def : cd) { + ByteBuffer val = row.getBytesUnsafe(def.getName()); + if (val != null) { + tuple.put(def.getName(), new ByteArrayByteIterator(val.array())); + } else { + tuple.put(def.getName(), null); + } } - return ERR; + result.add(tuple); + } + + return OK; + + } catch (Exception e) { + e.printStackTrace(); + System.out.println("Error scanning with startkey: " + startkey); + return ERR; } - /** - * Delete a record from the database. - * - * @param table The name of the table - * @param key The record key of the record to delete. - * @return Zero on success, a non-zero error code on error - */ - @Override - public int delete(String table, String key) { + } + + /** + * Update a record in the database. Any field/value pairs in the specified + * values HashMap will be written into the record with the specified record + * key, overwriting any existing values with the same field name. + * + * @param table + * The name of the table + * @param key + * The record key of the record to write. + * @param values + * A HashMap of field/value pairs to update in the record + * @return Zero on success, a non-zero error code on error + */ + @Override + public int update(String table, String key, + HashMap<String, ByteIterator> values) { + // Insert and updates provide the same functionality + return insert(table, key, values); + } + + /** + * Insert a record in the database. Any field/value pairs in the specified + * values HashMap will be written into the record with the specified record + * key. + * + * @param table + * The name of the table + * @param key + * The record key of the record to insert. + * @param values + * A HashMap of field/value pairs to insert in the record + * @return Zero on success, a non-zero error code on error + */ + @Override + public int insert(String table, String key, + HashMap<String, ByteIterator> values) { + + try { + Insert insertStmt = QueryBuilder.insertInto(table); + + // Add key + insertStmt.value(YCSB_KEY, key); + + // Add fields + for (Map.Entry<String, ByteIterator> entry : values.entrySet()) { + Object value; + ByteIterator byteIterator = entry.getValue(); + value = byteIterator.toString(); + + insertStmt.value(entry.getKey(), value); + } - try { - Statement stmt; + insertStmt.setConsistencyLevel(writeConsistencyLevel).enableTracing(); - stmt = QueryBuilder.delete().from(table).where(QueryBuilder.eq(YCSB_KEY, key)); - stmt.setConsistencyLevel(writeConsistencyLevel); + if (debug) { + System.out.println(insertStmt.toString()); + } - if (_debug) { - System.out.println(stmt.toString()); - } + ResultSet rs = session.execute(insertStmt); - ResultSet rs = session.execute(stmt); + return OK; + } catch (Exception e) { + e.printStackTrace(); + } - return OK; - } catch (Exception e) { - e.printStackTrace(); - System.out.println("Error deleting key: " + key); - } + return ERR; + } + + /** + * Delete a record from the database. + * + * @param table + * The name of the table + * @param key + * The record key of the record to delete. + * @return Zero on success, a non-zero error code on error + */ + @Override + public int delete(String table, String key) { + + try { + Statement stmt; + + stmt = QueryBuilder.delete().from(table) + .where(QueryBuilder.eq(YCSB_KEY, key)); + stmt.setConsistencyLevel(writeConsistencyLevel); + + if (debug) { + System.out.println(stmt.toString()); + } - return ERR; + ResultSet rs = session.execute(stmt); + + return OK; + } catch (Exception e) { + e.printStackTrace(); + System.out.println("Error deleting key: " + key); } + return ERR; + } + } diff --git a/cassandra2/src/main/java/com/yahoo/ycsb/db/package-info.java b/cassandra2/src/main/java/com/yahoo/ycsb/db/package-info.java new file mode 100644 index 0000000000000000000000000000000000000000..007f01dc54dd9d543b5380dff68880f23b1c009b --- /dev/null +++ b/cassandra2/src/main/java/com/yahoo/ycsb/db/package-info.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2014, Yahoo!, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +/** + * The YCSB binding for <a href="http://cassandra.apache.org/">Cassandra</a> + * 2.1+ via CQL. + */ +package com.yahoo.ycsb.db; +