From 09dc27a2895bae5051d6428569c8e5dcd8af9aa2 Mon Sep 17 00:00:00 2001
From: Connor McCoy <connormccoy@google.com>
Date: Thu, 15 Oct 2015 09:23:18 -0700
Subject: [PATCH] [cassandra2] Add Cassandra 2 CQL client

---
 bin/ycsb                                      |   1 +
 cassandra2/pom.xml                            |  49 ++
 .../com/yahoo/ycsb/db/CassandraCQLClient.java | 433 ++++++++++++++++++
 pom.xml                                       |   2 +
 4 files changed, 485 insertions(+)
 create mode 100644 cassandra2/pom.xml
 create mode 100644 cassandra2/src/main/java/com/yahoo/ycsb/db/CassandraCQLClient.java

diff --git a/bin/ycsb b/bin/ycsb
index a3769f18..8036dc71 100755
--- a/bin/ycsb
+++ b/bin/ycsb
@@ -51,6 +51,7 @@ DATABASES = {
     "cassandra-8"  : "com.yahoo.ycsb.db.CassandraClient8",
     "cassandra-10" : "com.yahoo.ycsb.db.CassandraClient10",
     "cassandra-cql": "com.yahoo.ycsb.db.CassandraCQLClient",
+    "cassandra2-cql": "com.yahoo.ycsb.db.CassandraCQLClient",
     "couchbase"    : "com.yahoo.ycsb.db.CouchbaseClient",
     "dynamodb"     : "com.yahoo.ycsb.db.DynamoDBClient",
     "elasticsearch": "com.yahoo.ycsb.db.ElasticSearchClient",
diff --git a/cassandra2/pom.xml b/cassandra2/pom.xml
new file mode 100644
index 00000000..13facd0c
--- /dev/null
+++ b/cassandra2/pom.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+Copyright (c) 2012-2015 YCSB contributors. All rights reserved.
+
+Licensed under the Apache License, Version 2.0 (the "License"); you
+may not use this file except in compliance with the License. You
+may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+implied. See the License for the specific language governing
+permissions and limitations under the License. See accompanying
+LICENSE file.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>com.yahoo.ycsb</groupId>
+    <artifactId>binding-parent</artifactId>
+    <version>0.5.0-SNAPSHOT</version>
+    <relativePath>../binding-parent</relativePath>
+  </parent>
+
+  <artifactId>cassandra2-binding</artifactId>
+  <name>Cassandra 2.1+ DB Binding</name>
+  <packaging>jar</packaging>
+
+  <dependencies>
+    <!-- CQL driver -->
+    <dependency>
+      <groupId>com.datastax.cassandra</groupId>
+      <artifactId>cassandra-driver-core</artifactId>
+      <version>${cassandra2.cql.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.yahoo.ycsb</groupId>
+      <artifactId>core</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+  </dependencies>
+</project>
diff --git a/cassandra2/src/main/java/com/yahoo/ycsb/db/CassandraCQLClient.java b/cassandra2/src/main/java/com/yahoo/ycsb/db/CassandraCQLClient.java
new file mode 100644
index 00000000..0fa2b301
--- /dev/null
+++ b/cassandra2/src/main/java/com/yahoo/ycsb/db/CassandraCQLClient.java
@@ -0,0 +1,433 @@
+/**
+ * Copyright (c) 2013-2015 YCSB contributors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License. See accompanying LICENSE file.
+ *
+ * Submitted by Chrisjan Matser on 10/11/2010.
+ */
+package com.yahoo.ycsb.db;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ColumnDefinitions;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.HostDistance;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.querybuilder.Insert;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.datastax.driver.core.querybuilder.Select;
+import com.yahoo.ycsb.ByteArrayByteIterator;
+import com.yahoo.ycsb.ByteIterator;
+import com.yahoo.ycsb.DB;
+import com.yahoo.ycsb.DBException;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.Vector;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+/**
+ * Cassandra 2.x CQL client.
+ *
+ * See {@code cassandra2/README.md} for details.
+ *
+ * @author cmatser
+ */
+public class CassandraCQLClient extends DB {
+
+    protected static Cluster cluster = null;
+    protected static Session session = null;
+
+    private static ConsistencyLevel readConsistencyLevel = ConsistencyLevel.ONE;
+    private static ConsistencyLevel writeConsistencyLevel = ConsistencyLevel.ONE;
+
+    public static final int OK = 0;
+    public static final int ERR = -1;
+    public static final int NOT_FOUND = -3;
+
+    public static final String YCSB_KEY = "y_id";
+    public static final String KEYSPACE_PROPERTY = "cassandra.keyspace";
+    public static final String KEYSPACE_PROPERTY_DEFAULT = "ycsb";
+    public static final String USERNAME_PROPERTY = "cassandra.username";
+    public static final String PASSWORD_PROPERTY = "cassandra.password";
+
+    public static final String HOSTS_PROPERTY = "hosts";
+    public static final String PORT_PROPERTY = "port";
+
+
+    public static final String READ_CONSISTENCY_LEVEL_PROPERTY = "cassandra.readconsistencylevel";
+    public static final String READ_CONSISTENCY_LEVEL_PROPERTY_DEFAULT = "ONE";
+    public static final String WRITE_CONSISTENCY_LEVEL_PROPERTY = "cassandra.writeconsistencylevel";
+    public static final String WRITE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT = "ONE";
+
+    /** Count the number of times initialized to teardown on the last {@link #cleanup()}. */
+    private static final AtomicInteger initCount = new AtomicInteger(0);
+
+    private static boolean _debug = false;
+
+    /**
+     * Initialize any state for this DB. Called once per DB instance; there is
+     * one DB instance per client thread.
+     */
+    @Override
+    public void init() throws DBException {
+
+        //Keep track of number of calls to init (for later cleanup)
+        initCount.incrementAndGet();
+
+        //Synchronized so that we only have a single
+        //  cluster/session instance for all the threads.
+        synchronized (initCount) {
+
+            //Check if the cluster has already been initialized
+            if (cluster != null) {
+                return;
+            }
+
+            try {
+
+                _debug = Boolean.parseBoolean(getProperties().getProperty("debug", "false"));
+
+                String host = getProperties().getProperty(HOSTS_PROPERTY);
+                if (host == null) {
+                    throw new DBException(String.format("Required property \"%s\" missing for CassandraCQLClient", HOSTS_PROPERTY));
+                }
+                String hosts[] = host.split(",");
+                String port = getProperties().getProperty("port", "9042");
+                if (port == null) {
+                    throw new DBException(String.format("Required property \"%s\" missing for CassandraCQLClient", PORT_PROPERTY));
+                }
+
+                String username = getProperties().getProperty(USERNAME_PROPERTY);
+                String password = getProperties().getProperty(PASSWORD_PROPERTY);
+
+                String keyspace = getProperties().getProperty(KEYSPACE_PROPERTY, KEYSPACE_PROPERTY_DEFAULT);
+
+                readConsistencyLevel = ConsistencyLevel.valueOf(getProperties().getProperty(READ_CONSISTENCY_LEVEL_PROPERTY, READ_CONSISTENCY_LEVEL_PROPERTY_DEFAULT));
+                writeConsistencyLevel = ConsistencyLevel.valueOf(getProperties().getProperty(WRITE_CONSISTENCY_LEVEL_PROPERTY, WRITE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT));
+
+                // public void connect(String node) {}
+                if ((username != null) && !username.isEmpty()) {
+                    cluster = Cluster.builder()
+                        .withCredentials(username, password)
+                        .withPort(Integer.valueOf(port))
+                        .addContactPoints(hosts).build();
+                }
+                else {
+                    cluster = Cluster.builder()
+                        .withPort(Integer.valueOf(port))
+                        .addContactPoints(hosts).build();
+                }
+
+                //Update number of connections based on threads
+                int threadcount = Integer.parseInt(getProperties().getProperty("threadcount","1"));
+                cluster.getConfiguration().getPoolingOptions().setMaxConnectionsPerHost(HostDistance.LOCAL, threadcount);
+
+                //Set connection timeout 3min (default is 5s)
+                cluster.getConfiguration().getSocketOptions().setConnectTimeoutMillis(3*60*1000);
+                //Set read (execute) timeout 3min (default is 12s)
+                cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(3*60*1000);
+
+                Metadata metadata = cluster.getMetadata();
+                System.err.printf("Connected to cluster: %s\n", metadata.getClusterName());
+
+                for (Host discoveredHost : metadata.getAllHosts()) {
+                    System.out.printf("Datacenter: %s; Host: %s; Rack: %s\n",
+                        discoveredHost.getDatacenter(),
+                        discoveredHost.getAddress(),
+                        discoveredHost.getRack());
+                }
+
+                session = cluster.connect(keyspace);
+
+            } catch (Exception e) {
+                throw new DBException(e);
+            }
+        }//synchronized
+    }
+
+    /**
+     * Cleanup any state for this DB. Called once per DB instance; there is one
+     * DB instance per client thread.
+     */
+    @Override
+    public void cleanup() throws DBException {
+      synchronized(initCount) {
+        final int curInitCount = initCount.decrementAndGet();
+        if (curInitCount <= 0) {
+          session.close();
+          cluster.close();
+          cluster = null;
+          session = null;
+        }
+        if (curInitCount < 0) {
+          // This should never happen.
+          throw new DBException(
+              String.format("initCount is negative: %d", curInitCount));
+        }
+      }
+    }
+
+    /**
+     * Read a record from the database. Each field/value pair from the result
+     * will be stored in a HashMap.
+     *
+     * @param table The name of the table
+     * @param key The record key of the record to read.
+     * @param fields The list of fields to read, or null for all of them
+     * @param result A HashMap of field/value pairs for the result
+     * @return Zero on success, a non-zero error code on error
+     */
+    @Override
+    public int read(String table, String key, Set<String> fields, HashMap<String, ByteIterator> result) {
+        try {
+            Statement stmt;
+            Select.Builder selectBuilder;
+
+            if (fields == null) {
+                selectBuilder = QueryBuilder.select().all();
+            }
+            else {
+                selectBuilder = QueryBuilder.select();
+                for (String col : fields) {
+                    ((Select.Selection) selectBuilder).column(col);
+                }
+            }
+
+            stmt = selectBuilder.from(table).where(QueryBuilder.eq(YCSB_KEY, key)).limit(1);
+            stmt.setConsistencyLevel(readConsistencyLevel);
+
+            if (_debug) {
+                System.out.println(stmt.toString());
+            }
+
+            ResultSet rs = session.execute(stmt);
+
+            if (rs.isExhausted()) {
+              return NOT_FOUND;
+            }
+
+            //Should be only 1 row
+            Row row = rs.one();
+            ColumnDefinitions cd = row.getColumnDefinitions();
+
+            for (ColumnDefinitions.Definition def : cd) {
+                ByteBuffer val = row.getBytesUnsafe(def.getName());
+                if (val != null) {
+                    result.put(def.getName(),
+                        new ByteArrayByteIterator(val.array()));
+                }
+                else {
+                    result.put(def.getName(), null);
+                }
+            }
+
+            return OK;
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            System.out.println("Error reading key: " + key);
+            return ERR;
+        }
+
+    }
+
+    /**
+     * Perform a range scan for a set of records in the database. Each
+     * field/value pair from the result will be stored in a HashMap.
+     *
+     * Cassandra CQL uses "token" method for range scan which doesn't always
+     * yield intuitive results.
+     *
+     * @param table The name of the table
+     * @param startkey The record key of the first record to read.
+     * @param recordcount The number of records to read
+     * @param fields The list of fields to read, or null for all of them
+     * @param result A Vector of HashMaps, where each HashMap is a set
+     * field/value pairs for one record
+     * @return Zero on success, a non-zero error code on error
+     */
+    @Override
+    public int scan(String table, String startkey, int recordcount, Set<String> fields, Vector<HashMap<String, ByteIterator>> result) {
+
+        try {
+            Statement stmt;
+            Select.Builder selectBuilder;
+
+            if (fields == null) {
+                selectBuilder = QueryBuilder.select().all();
+            }
+            else {
+                selectBuilder = QueryBuilder.select();
+                for (String col : fields) {
+                    ((Select.Selection) selectBuilder).column(col);
+                }
+            }
+
+            stmt = selectBuilder.from(table);
+
+            //The statement builder is not setup right for tokens.
+            //  So, we need to build it manually.
+            String initialStmt = stmt.toString();
+            StringBuilder scanStmt = new StringBuilder();
+            scanStmt.append(
+                initialStmt.substring(0, initialStmt.length()-1));
+            scanStmt.append(" WHERE ");
+            scanStmt.append(QueryBuilder.token(YCSB_KEY));
+            scanStmt.append(" >= ");
+            scanStmt.append("token('");
+            scanStmt.append(startkey);
+            scanStmt.append("')");
+            scanStmt.append(" LIMIT ");
+            scanStmt.append(recordcount);
+
+            stmt = new SimpleStatement(scanStmt.toString());
+            stmt.setConsistencyLevel(readConsistencyLevel);
+
+            if (_debug) {
+                System.out.println(stmt.toString());
+            }
+
+            ResultSet rs = session.execute(stmt);
+
+            HashMap<String, ByteIterator> tuple;
+            while (!rs.isExhausted()) {
+                Row row = rs.one();
+                tuple = new HashMap<String, ByteIterator> ();
+
+                ColumnDefinitions cd = row.getColumnDefinitions();
+
+                for (ColumnDefinitions.Definition def : cd) {
+                    ByteBuffer val = row.getBytesUnsafe(def.getName());
+                    if (val != null) {
+                        tuple.put(def.getName(),
+                            new ByteArrayByteIterator(val.array()));
+                    }
+                    else {
+                        tuple.put(def.getName(), null);
+                    }
+                }
+
+                result.add(tuple);
+            }
+
+            return OK;
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            System.out.println("Error scanning with startkey: " + startkey);
+            return ERR;
+        }
+
+    }
+
+    /**
+     * Update a record in the database. Any field/value pairs in the specified
+     * values HashMap will be written into the record with the specified record
+     * key, overwriting any existing values with the same field name.
+     *
+     * @param table The name of the table
+     * @param key The record key of the record to write.
+     * @param values A HashMap of field/value pairs to update in the record
+     * @return Zero on success, a non-zero error code on error
+     */
+    @Override
+    public int update(String table, String key, HashMap<String, ByteIterator> values) {
+        //Insert and updates provide the same functionality
+        return insert(table, key, values);
+    }
+
+    /**
+     * Insert a record in the database. Any field/value pairs in the specified
+     * values HashMap will be written into the record with the specified record
+     * key.
+     *
+     * @param table The name of the table
+     * @param key The record key of the record to insert.
+     * @param values A HashMap of field/value pairs to insert in the record
+     * @return Zero on success, a non-zero error code on error
+     */
+    @Override
+    public int insert(String table, String key, HashMap<String, ByteIterator> values) {
+
+        try {
+            Insert insertStmt = QueryBuilder.insertInto(table);
+
+            //Add key
+            insertStmt.value(YCSB_KEY, key);
+
+            //Add fields
+            for (Map.Entry<String, ByteIterator> entry : values.entrySet()) {
+                Object value;
+                ByteIterator byteIterator = entry.getValue();
+                value = byteIterator.toString();
+
+                insertStmt.value(entry.getKey(), value);
+            }
+
+            insertStmt.setConsistencyLevel(writeConsistencyLevel).enableTracing();
+
+            if (_debug) {
+                System.out.println(insertStmt.toString());
+            }
+
+            ResultSet rs = session.execute(insertStmt);
+
+            return OK;
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+        return ERR;
+    }
+
+    /**
+     * Delete a record from the database.
+     *
+     * @param table The name of the table
+     * @param key The record key of the record to delete.
+     * @return Zero on success, a non-zero error code on error
+     */
+    @Override
+    public int delete(String table, String key) {
+
+        try {
+            Statement stmt;
+
+            stmt = QueryBuilder.delete().from(table).where(QueryBuilder.eq(YCSB_KEY, key));
+            stmt.setConsistencyLevel(writeConsistencyLevel);
+
+            if (_debug) {
+                System.out.println(stmt.toString());
+            }
+
+            ResultSet rs = session.execute(stmt);
+
+            return OK;
+        } catch (Exception e) {
+            e.printStackTrace();
+            System.out.println("Error deleting key: " + key);
+        }
+
+        return ERR;
+    }
+
+}
diff --git a/pom.xml b/pom.xml
index 5e71d983..b08e1bb6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,6 +75,7 @@ LICENSE file.
     <accumulo.version>1.6.0</accumulo.version>
     <cassandra.version>1.2.9</cassandra.version>
     <cassandra.cql.version>1.0.3</cassandra.cql.version>
+    <cassandra2.cql.version>2.1.8</cassandra2.cql.version>
     <gemfire.version>8.1.0</gemfire.version>
     <infinispan.version>7.2.2.Final</infinispan.version>
     <kudu.version>0.5.0</kudu.version>
@@ -101,6 +102,7 @@ LICENSE file.
     <module>accumulo</module>
     <module>aerospike</module>
     <module>cassandra</module>
+    <module>cassandra2</module>
     <module>couchbase</module>
     <module>distribution</module>
     <module>dynamodb</module>
-- 
GitLab