Skip to content
Snippets Groups Projects
Commit f75ffe81 authored by Michi Mutsuzaki's avatar Michi Mutsuzaki
Browse files

gh-59 Cassandra 1.0.6 (akkumar)

parent 83e4c0e8
No related branches found
No related tags found
No related merge requests found
......@@ -10,6 +10,9 @@ CASSANDRA_7_DIR=db/cassandra-0.7/lib
CASSANDRA_7_FILE=apache-cassandra-0.7.9-bin.tar.gz
CASSANDRA_8_DIR=db/cassandra-0.8/lib
CASSANDRA_8_FILE=apache-cassandra-0.8.7-bin.tar.gz
CASSANDRA_1X_VERSION=1.0.6
CASSANDRA_1X_DIR=db/cassandra-$(CASSANDRA_1X_VERSION)/lib
CASSANDRA_1X_FILE=apache-cassandra-$(CASSANDRA_1X_VERSION)-bin.tar.gz
HBASE_DIR=db/hbase/lib
HBASE_VERSION=0.90.5
HBASE_FILE=hbase-$(HBASE_VERSION).tar.gz
......@@ -35,6 +38,7 @@ download-database-deps: $(CASSANDRA_5_DIR)/$(CASSANDRA_5_FILE) \
$(CASSANDRA_6_DIR)/$(CASSANDRA_6_FILE) \
$(CASSANDRA_7_DIR)/$(CASSANDRA_7_FILE) \
$(CASSANDRA_8_DIR)/$(CASSANDRA_8_FILE) \
$(CASSANDRA_1X_DIR)/$(CASSANDRA_1X_FILE) \
$(HBASE_DIR)/$(HBASE_FILE) \
$(INFINISPAN_DIR)/$(INFINISPAN_FILE) \
$(MONGODB_DIR)/$(MONGODB_FILE) \
......@@ -63,6 +67,11 @@ $(CASSANDRA_8_DIR)/$(CASSANDRA_8_FILE) :
-O $(CASSANDRA_8_DIR)/$(CASSANDRA_8_FILE)
tar -C $(CASSANDRA_8_DIR) -zxf $(CASSANDRA_8_DIR)/$(CASSANDRA_8_FILE)
$(CASSANDRA_1X_DIR)/$(CASSANDRA_1X_FILE) :
wget http://archive.apache.org/dist/cassandra/$(CASSANDRA_1X_VERSION)/$(CASSANDRA_1X_FILE)\
-O $(CASSANDRA_1X_DIR)/$(CASSANDRA_1X_FILE)
tar -C $(CASSANDRA_1X_DIR) -zxf $(CASSANDRA_1X_DIR)/$(CASSANDRA_1X_FILE)
$(HBASE_DIR)/$(HBASE_FILE) :
wget http://archive.apache.org/dist/hbase/hbase-$(HBASE_VERSION)/$(HBASE_FILE)\
-O $(HBASE_DIR)/$(HBASE_FILE)
......
<?xml version="1.0" encoding="UTF-8"?>
<project name="ycsb" default="compile" basedir=".">
<property name="src.dir" value="src"/>
<property name="lib.dir" value="lib"/>
<property name="doc.dir" value="doc"/>
......@@ -26,56 +26,61 @@
</target>
<target name="dbcompile-cassandra-0.5" depends="compile">
<property name="db.dir" value="db/cassandra-0.5"/>
<antcall target="dbcompile"/>
<property name="db.dir" value="db/cassandra-0.5"/>
<antcall target="dbcompile"/>
</target>
<target name="dbcompile-cassandra-0.6" depends="compile">
<property name="db.dir" value="db/cassandra-0.6"/>
<antcall target="dbcompile"/>
</target>
<target name="dbcompile-cassandra-0.7" depends="compile">
<property name="db.dir" value="db/cassandra-0.7"/>
<antcall target="dbcompile"/>
</target>
<target name="dbcompile-cassandra-0.6" depends="compile">
<property name="db.dir" value="db/cassandra-0.6"/>
<antcall target="dbcompile"/>
</target>
<target name="dbcompile-cassandra-0.7" depends="compile">
<property name="db.dir" value="db/cassandra-0.7"/>
<antcall target="dbcompile"/>
</target>
<target name="dbcompile-cassandra-0.8" depends="compile">
<property name="db.dir" value="db/cassandra-0.8"/>
<antcall target="dbcompile"/>
</target>
<target name="dbcompile-hbase" depends="compile">
<property name="db.dir" value="db/hbase"/>
<antcall target="dbcompile"/>
</target>
<target name="dbcompile-mongodb" depends="compile">
<property name="db.dir" value="db/mongodb"/>
<antcall target="dbcompile"/>
</target>
<target name="dbcompile-voldemort" depends="compile">
<property name="db.dir" value="db/voldemort"/>
<antcall target="dbcompile"/>
</target>
<target name="dbcompile-jdbc" depends="compile">
<property name="db.dir" value="db/jdbc"/>
<antcall target="dbcompile"/>
</target>
<target name="dbcompile-cassandra-1.0.6" depends="compile">
<property name="db.dir" value="db/cassandra-1.0.6"/>
<antcall target="dbcompile"/>
</target>
<target name="dbcompile-hbase" depends="compile">
<property name="db.dir" value="db/hbase"/>
<antcall target="dbcompile"/>
</target>
<target name="dbcompile-mongodb" depends="compile">
<property name="db.dir" value="db/mongodb"/>
<antcall target="dbcompile"/>
</target>
<target name="dbcompile-voldemort" depends="compile">
<property name="db.dir" value="db/voldemort"/>
<antcall target="dbcompile"/>
</target>
<target name="dbcompile-jdbc" depends="compile">
<property name="db.dir" value="db/jdbc"/>
<antcall target="dbcompile"/>
</target>
<target name="dbcompile-redis" depends="compile">
<property name="db.dir" value="db/redis"/>
<antcall target="dbcompile"/>
</target>
<target name="dbcompile-mapkeeper" depends="compile">
<target name="dbcompile-mapkeeper" depends="compile">
<property name="db.dir" value="db/mapkeeper"/>
<antcall target="dbcompile"/>
</target>
</target>
<target name="compile">
<target name="compile">
<mkdir dir="${classes.dir}"/>
<javac includeantruntime="false" srcdir="${src.dir}" destdir="${classes.dir}" classpathref="build.classpath" deprecation="on">
<compilerarg value="-Xlint:unchecked"/>
......@@ -84,22 +89,22 @@
</target>
<target name="dbcompile">
<path id="dbclasspath">
<fileset dir="${db.dir}/lib" includes="**/*.jar"/>
<fileset file="build/ycsb.jar"/>
</path>
<mkdir dir="${classes.dir}"/>
<javac includeantruntime="false" srcdir="${db.dir}/src" destdir="${classes.dir}" classpathref="dbclasspath" deprecation="on">
<path id="dbclasspath">
<fileset dir="${db.dir}/lib" includes="**/*.jar"/>
<fileset file="build/ycsb.jar"/>
</path>
<mkdir dir="${classes.dir}"/>
<javac includeantruntime="false" srcdir="${db.dir}/src" destdir="${classes.dir}" classpathref="dbclasspath" deprecation="on">
<compilerarg value="-Xlint:unchecked"/>
</javac>
<antcall target="makejar"/>
</javac>
<antcall target="makejar"/>
</target>
<target name ="makejar" description="Create a jar for the YCSB project">
<jar jarfile="build/ycsb.jar" includes="**/*.class" basedir="${classes.dir}"/>
</target>
<target name ="makejar" description="Create a jar for the YCSB project">
<jar jarfile="build/ycsb.jar" includes="**/*.class" basedir="${classes.dir}"/>
</target>
<target name="clean">
<delete includeemptydirs="true">
<fileset dir="build" includes="**/*"/>
......@@ -107,7 +112,7 @@
</target>
<target name="doc">
<javadoc destdir="${doc.dir}/javadoc" packagenames="com.yahoo.ycsb,com.yahoo.ycsb.workloads,com.yahoo.ycsb.db,com.yahoo.ycsb.generator,com.yahoo.ycsb.measurements">
<javadoc destdir="${doc.dir}/javadoc" packagenames="com.yahoo.ycsb,com.yahoo.ycsb.workloads,com.yahoo.ycsb.db,com.yahoo.ycsb.generator,com.yahoo.ycsb.measurements">
<fileset dir="." defaultexcludes="yes">
<include name="src/**"/>
<include name="db/*/src/**"/>
......
......@@ -26,6 +26,7 @@
* gh-57 voldemort - enable nio connector (akkumar)
* gh-58 benchmarking with hbase 0.90.5 (akkumar)
* gh-55 VMware vFabric GemFire (sbawaska)
* gh-59 Cassandra 1.0.6 (akkumar)
0.1.3
* Voldemort binding (rsumbaly)
......
/**
* Copyright (c) 2010 Yahoo! Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb.db;
import com.yahoo.ycsb.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Vector;
import java.util.Random;
import java.util.Properties;
import java.nio.ByteBuffer;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.cassandra.thrift.*;
//XXXX if we do replication, fix the consistency levels
/**
* Cassandra 1.0.6 client for YCSB framework
*/
public class CassandraClient10 extends DB
{
static Random random = new Random();
public static final int Ok = 0;
public static final int Error = -1;
public static final ByteBuffer emptyByteBuffer = ByteBuffer.wrap(new byte[0]);
public int ConnectionRetries;
public int OperationRetries;
public String column_family;
public static final String CONNECTION_RETRY_PROPERTY = "cassandra.connectionretries";
public static final String CONNECTION_RETRY_PROPERTY_DEFAULT = "300";
public static final String OPERATION_RETRY_PROPERTY = "cassandra.operationretries";
public static final String OPERATION_RETRY_PROPERTY_DEFAULT = "300";
public static final String USERNAME_PROPERTY = "cassandra.username";
public static final String PASSWORD_PROPERTY = "cassandra.password";
public static final String COLUMN_FAMILY_PROPERTY = "cassandra.columnfamily";
public static final String COLUMN_FAMILY_PROPERTY_DEFAULT = "data";
TTransport tr;
Cassandra.Client client;
boolean _debug = false;
String _table = "";
Exception errorexception = null;
List<Mutation> mutations = new ArrayList<Mutation>();
Map<String, List<Mutation>> mutationMap = new HashMap<String, List<Mutation>>();
Map<ByteBuffer, Map<String, List<Mutation>>> record = new HashMap<ByteBuffer, Map<String, List<Mutation>>>();
ColumnParent parent;
/**
* Initialize any state for this DB. Called once per DB instance; there is one
* DB instance per client thread.
*/
public void init() throws DBException
{
String hosts = getProperties().getProperty("hosts");
if (hosts == null)
{
throw new DBException("Required property \"hosts\" missing for CassandraClient");
}
column_family = getProperties().getProperty(COLUMN_FAMILY_PROPERTY, COLUMN_FAMILY_PROPERTY_DEFAULT);
parent = new ColumnParent(column_family);
ConnectionRetries = Integer.parseInt(getProperties().getProperty(CONNECTION_RETRY_PROPERTY,
CONNECTION_RETRY_PROPERTY_DEFAULT));
OperationRetries = Integer.parseInt(getProperties().getProperty(OPERATION_RETRY_PROPERTY,
OPERATION_RETRY_PROPERTY_DEFAULT));
String username = getProperties().getProperty(USERNAME_PROPERTY);
String password = getProperties().getProperty(PASSWORD_PROPERTY);
_debug = Boolean.parseBoolean(getProperties().getProperty("debug", "false"));
String[] allhosts = hosts.split(",");
String myhost = allhosts[random.nextInt(allhosts.length)];
Exception connectexception = null;
for (int retry = 0; retry < ConnectionRetries; retry++)
{
tr = new TFramedTransport(new TSocket(myhost, 9160));
TProtocol proto = new TBinaryProtocol(tr);
client = new Cassandra.Client(proto);
try
{
tr.open();
connectexception = null;
break;
} catch (Exception e)
{
connectexception = e;
}
try
{
Thread.sleep(1000);
} catch (InterruptedException e)
{
}
}
if (connectexception != null)
{
System.err.println("Unable to connect to " + myhost + " after " + ConnectionRetries
+ " tries");
System.out.println("Unable to connect to " + myhost + " after " + ConnectionRetries
+ " tries");
throw new DBException(connectexception);
}
if (username != null && password != null)
{
Map<String,String> cred = new HashMap<String,String>();
cred.put("username", username);
cred.put("password", password);
AuthenticationRequest req = new AuthenticationRequest(cred);
try
{
client.login(req);
}
catch (Exception e)
{
throw new DBException(e);
}
}
}
/**
* Cleanup any state for this DB. Called once per DB instance; there is one DB
* instance per client thread.
*/
public void cleanup() throws DBException
{
tr.close();
}
/**
* Read a record from the database. Each field/value pair from the result will
* be stored in a HashMap.
*
* @param table
* The name of the table
* @param key
* The record key of the record to read.
* @param fields
* The list of fields to read, or null for all of them
* @param result
* A HashMap of field/value pairs for the result
* @return Zero on success, a non-zero error code on error
*/
public int read(String table, String key, Set<String> fields, HashMap<String, ByteIterator> result)
{
if (!_table.equals(table)) {
try
{
client.set_keyspace(table);
_table = table;
}
catch (Exception e)
{
e.printStackTrace();
e.printStackTrace(System.out);
return Error;
}
}
for (int i = 0; i < OperationRetries; i++)
{
try
{
SlicePredicate predicate;
if (fields == null)
{
predicate = new SlicePredicate().setSlice_range(new SliceRange(emptyByteBuffer, emptyByteBuffer, false, 1000000));
} else {
ArrayList<ByteBuffer> fieldlist = new ArrayList<ByteBuffer>(fields.size());
for (String s : fields)
{
fieldlist.add(ByteBuffer.wrap(s.getBytes("UTF-8")));
}
predicate = new SlicePredicate().setColumn_names(fieldlist);
}
List<ColumnOrSuperColumn> results = client.get_slice(ByteBuffer.wrap(key.getBytes("UTF-8")), parent, predicate, ConsistencyLevel.ONE);
if (_debug)
{
System.out.print("Reading key: " + key);
}
Column column;
String name;
ByteIterator value;
for (ColumnOrSuperColumn oneresult : results)
{
column = oneresult.column;
name = new String(column.name.array(), column.name.position()+column.name.arrayOffset(), column.name.remaining());
value = new ByteArrayByteIterator(column.value.array(), column.value.position()+column.value.arrayOffset(), column.value.remaining());
result.put(name,value);
if (_debug)
{
System.out.print("(" + name + "=" + value + ")");
}
}
if (_debug)
{
System.out.println();
}
return Ok;
} catch (Exception e)
{
errorexception = e;
}
try
{
Thread.sleep(500);
} catch (InterruptedException e)
{
}
}
errorexception.printStackTrace();
errorexception.printStackTrace(System.out);
return Error;
}
/**
* Perform a range scan for a set of records in the database. Each field/value
* pair from the result will be stored in a HashMap.
*
* @param table
* The name of the table
* @param startkey
* The record key of the first record to read.
* @param recordcount
* The number of records to read
* @param fields
* The list of fields to read, or null for all of them
* @param result
* A Vector of HashMaps, where each HashMap is a set field/value
* pairs for one record
* @return Zero on success, a non-zero error code on error
*/
public int scan(String table, String startkey, int recordcount, Set<String> fields,
Vector<HashMap<String, ByteIterator>> result)
{
if (!_table.equals(table)) {
try
{
client.set_keyspace(table);
_table = table;
}
catch (Exception e)
{
e.printStackTrace();
e.printStackTrace(System.out);
return Error;
}
}
for (int i = 0; i < OperationRetries; i++)
{
try
{
SlicePredicate predicate;
if (fields == null)
{
predicate = new SlicePredicate().setSlice_range(new SliceRange(emptyByteBuffer, emptyByteBuffer, false, 1000000));
} else {
ArrayList<ByteBuffer> fieldlist = new ArrayList<ByteBuffer>(fields.size());
for (String s : fields)
{
fieldlist.add(ByteBuffer.wrap(s.getBytes("UTF-8")));
}
predicate = new SlicePredicate().setColumn_names(fieldlist);
}
KeyRange kr = new KeyRange().setStart_key(startkey.getBytes("UTF-8")).setEnd_key(new byte[] {}).setCount(recordcount);
List<KeySlice> results = client.get_range_slices(parent, predicate, kr, ConsistencyLevel.ONE);
if (_debug)
{
System.out.println("Scanning startkey: " + startkey);
}
HashMap<String, ByteIterator> tuple;
for (KeySlice oneresult : results)
{
tuple = new HashMap<String, ByteIterator>();
Column column;
String name;
ByteIterator value;
for (ColumnOrSuperColumn onecol : oneresult.columns)
{
column = onecol.column;
name = new String(column.name.array(), column.name.position()+column.name.arrayOffset(), column.name.remaining());
value = new ByteArrayByteIterator(column.value.array(), column.value.position()+column.value.arrayOffset(), column.value.remaining());
tuple.put(name, value);
if (_debug)
{
System.out.print("(" + name + "=" + value + ")");
}
}
result.add(tuple);
if (_debug)
{
System.out.println();
}
}
return Ok;
} catch (Exception e)
{
errorexception = e;
}
try
{
Thread.sleep(500);
} catch (InterruptedException e)
{
}
}
errorexception.printStackTrace();
errorexception.printStackTrace(System.out);
return Error;
}
/**
* Update a record in the database. Any field/value pairs in the specified
* values HashMap will be written into the record with the specified record
* key, overwriting any existing values with the same field name.
*
* @param table
* The name of the table
* @param key
* The record key of the record to write.
* @param values
* A HashMap of field/value pairs to update in the record
* @return Zero on success, a non-zero error code on error
*/
public int update(String table, String key, HashMap<String, ByteIterator> values)
{
return insert(table, key, values);
}
/**
* Insert a record in the database. Any field/value pairs in the specified
* values HashMap will be written into the record with the specified record
* key.
*
* @param table
* The name of the table
* @param key
* The record key of the record to insert.
* @param values
* A HashMap of field/value pairs to insert in the record
* @return Zero on success, a non-zero error code on error
*/
public int insert(String table, String key, HashMap<String, ByteIterator> values)
{
if (!_table.equals(table)) {
try
{
client.set_keyspace(table);
_table = table;
}
catch (Exception e)
{
e.printStackTrace();
e.printStackTrace(System.out);
return Error;
}
}
for (int i = 0; i < OperationRetries; i++)
{
if (_debug)
{
System.out.println("Inserting key: " + key);
}
try
{
ByteBuffer wrappedKey = ByteBuffer.wrap(key.getBytes("UTF-8"));
Column col;
ColumnOrSuperColumn column;
for (Map.Entry<String, ByteIterator> entry : values.entrySet())
{
col = new Column();
col.setName(ByteBuffer.wrap(entry.getKey().getBytes("UTF-8")));
col.setValue(ByteBuffer.wrap(entry.getValue().toArray()));
col.setTimestamp(System.currentTimeMillis());
column = new ColumnOrSuperColumn();
column.setColumn(col);
mutations.add(new Mutation().setColumn_or_supercolumn(column));
}
mutationMap.put(column_family, mutations);
record.put(wrappedKey, mutationMap);
client.batch_mutate(record, ConsistencyLevel.ONE);
mutations.clear();
mutationMap.clear();
record.clear();
return Ok;
} catch (Exception e)
{
errorexception = e;
}
try
{
Thread.sleep(500);
} catch (InterruptedException e)
{
}
}
errorexception.printStackTrace();
errorexception.printStackTrace(System.out);
return Error;
}
/**
* Delete a record from the database.
*
* @param table
* The name of the table
* @param key
* The record key of the record to delete.
* @return Zero on success, a non-zero error code on error
*/
public int delete(String table, String key)
{
if (!_table.equals(table)) {
try
{
client.set_keyspace(table);
_table = table;
}
catch (Exception e)
{
e.printStackTrace();
e.printStackTrace(System.out);
return Error;
}
}
for (int i = 0; i < OperationRetries; i++)
{
try
{
client.remove(ByteBuffer.wrap(key.getBytes("UTF-8")),
new ColumnPath(column_family),
System.currentTimeMillis(),
ConsistencyLevel.ONE);
if (_debug)
{
System.out.println("Delete key: " + key);
}
return Ok;
} catch (Exception e)
{
errorexception = e;
}
try
{
Thread.sleep(500);
} catch (InterruptedException e)
{
}
}
errorexception.printStackTrace();
errorexception.printStackTrace(System.out);
return Error;
}
public static void main(String[] args)
{
CassandraClient10 cli = new CassandraClient10();
Properties props = new Properties();
props.setProperty("hosts", args[0]);
cli.setProperties(props);
try
{
cli.init();
} catch (Exception e)
{
e.printStackTrace();
System.exit(0);
}
HashMap<String, ByteIterator> vals = new HashMap<String, ByteIterator>();
vals.put("age", new StringByteIterator("57"));
vals.put("middlename", new StringByteIterator("bradley"));
vals.put("favoritecolor", new StringByteIterator("blue"));
int res = cli.insert("usertable", "BrianFrankCooper", vals);
System.out.println("Result of insert: " + res);
HashMap<String, ByteIterator> result = new HashMap<String, ByteIterator>();
HashSet<String> fields = new HashSet<String>();
fields.add("middlename");
fields.add("age");
fields.add("favoritecolor");
res = cli.read("usertable", "BrianFrankCooper", null, result);
System.out.println("Result of read: " + res);
for (String s : result.keySet())
{
System.out.println("[" + s + "]=[" + result.get(s) + "]");
}
res = cli.delete("usertable", "BrianFrankCooper");
System.out.println("Result of delete: " + res);
}
/*
* public static void main(String[] args) throws TException,
* InvalidRequestException, UnavailableException,
* UnsupportedEncodingException, NotFoundException {
*
*
*
* String key_user_id = "1";
*
*
*
*
* client.insert("Keyspace1", key_user_id, new ColumnPath("Standard1", null,
* "age".getBytes("UTF-8")), "24".getBytes("UTF-8"), timestamp,
* ConsistencyLevel.ONE);
*
*
* // read single column ColumnPath path = new ColumnPath("Standard1", null,
* "name".getBytes("UTF-8"));
*
* System.out.println(client.get("Keyspace1", key_user_id, path,
* ConsistencyLevel.ONE));
*
*
* // read entire row SlicePredicate predicate = new SlicePredicate(null, new
* SliceRange(new byte[0], new byte[0], false, 10));
*
* ColumnParent parent = new ColumnParent("Standard1", null);
*
* List<ColumnOrSuperColumn> results = client.get_slice("Keyspace1",
* key_user_id, parent, predicate, ConsistencyLevel.ONE);
*
* for (ColumnOrSuperColumn result : results) {
*
* Column column = result.column;
*
* System.out.println(new String(column.name, "UTF-8") + " -> " + new
* String(column.value, "UTF-8"));
*
* }
*
*
*
*
* }
*/
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment