diff --git a/.gitignore b/.gitignore index 3394359620518d030b1c6d493149db60902097d8..989ab5feca253aa4d8ab1d9d9f9abf78fd76aad4 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,8 @@ target # ignore output files from testing output* + +# ignore standard eclipse +.project +.classpath +.settings diff --git a/accumulo/pom.xml b/accumulo/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..e74e2c1ce8b9eb99a23f7b7ce96cd5645c8508c1 --- /dev/null +++ b/accumulo/pom.xml @@ -0,0 +1,97 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>com.yahoo.ycsb</groupId> + <artifactId>root</artifactId> + <version>0.1.4</version> + </parent> + <artifactId>accumulo-binding</artifactId> + <name>Accumulo DB Binding</name> + <dependencies> + <dependency> + <groupId>org.apache.accumulo</groupId> + <artifactId>accumulo-core</artifactId> + <version>${accumulo.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.thrift</groupId> + <artifactId>thrift</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + <version>3.3.1</version> + </dependency> + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + <version>1.2.16</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.8.1</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-core</artifactId> + <version>0.20.203.0</version> + </dependency> + <dependency> + <groupId>mysql</groupId> + <artifactId>mysql-connector-java</artifactId> + <version>5.1.14</version> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>15.0</version> + </dependency> + <dependency> + <groupId>com.yahoo.ycsb</groupId> + <artifactId>core</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-assembly-plugin</artifactId> + <version>${maven.assembly.version}</version> + <configuration> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + <appendAssemblyId>false</appendAssemblyId> + </configuration> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + <repositories> + <repository> + <id>apache</id> + <url>http://repository.apache.org/snapshots</url> + </repository> + </repositories> +</project> diff --git a/accumulo/src/main/conf/accumulo.properties b/accumulo/src/main/conf/accumulo.properties new file mode 100644 index 0000000000000000000000000000000000000000..191ad416d25b2b14531925c1b76a5ba4cbf6870a --- /dev/null +++ b/accumulo/src/main/conf/accumulo.properties @@ -0,0 +1,44 @@ +# Copyright 2014 Cloudera, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you +# may not use this file except in compliance with the License. You +# may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. See the License for the specific language governing +# permissions and limitations under the License. See accompanying +# LICENSE file. +# +# Sample Accumulo configuration properties +# +# You may either set properties here or via the command line. +# + +# This will influence the keys we write +accumulo.columnFamily=YCSB + +# This should be set based on your Accumulo cluster +#accumulo.instanceName=ExampleInstance + +# Comma separated list of host:port tuples for the ZooKeeper quorum used +# by your Accumulo cluster +#accumulo.zooKeepers=zoo1.example.com:2181,zoo2.example.com:2181,zoo3.example.com:2181 + +# This user will need permissions on the table YCSB works against +#accumulo.username=ycsb +#accumulo.password=protectyaneck + +# Controls how long our client writer will wait to buffer more data +# measured in milliseconds +accumulo.batchWriterMaxLatency=30000 + +# Controls how much data our client will attempt to buffer before sending +# measured in bytes +accumulo.batchWriterSize=100000 + +# Controls how many worker threads our client will use to parallelize writes +accumulo.batchWriterThreads=1 diff --git a/accumulo/src/main/java/com/yahoo/ycsb/db/AccumuloClient.java b/accumulo/src/main/java/com/yahoo/ycsb/db/AccumuloClient.java new file mode 100644 index 0000000000000000000000000000000000000000..0551849ce5e7e711d6eaa99aad5841158fb5ebc4 --- /dev/null +++ b/accumulo/src/main/java/com/yahoo/ycsb/db/AccumuloClient.java @@ -0,0 +1,435 @@ +package com.yahoo.ycsb.db; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Hashtable; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; +import java.util.TreeSet; +import java.util.Vector; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.CleanUp; +import org.apache.hadoop.io.Text; +import org.apache.zookeeper.KeeperException; + +import com.yahoo.ycsb.ByteArrayByteIterator; +import com.yahoo.ycsb.ByteIterator; +import com.yahoo.ycsb.DB; +import com.yahoo.ycsb.DBException; + +public class AccumuloClient extends DB { + // Error code constants. + public static final int Ok = 0; + public static final int ServerError = -1; + public static final int HttpError = -2; + public static final int NoMatchingRecord = -3; + + private ZooKeeperInstance _inst; + private Connector _connector; + private String _table = ""; + private BatchWriter _bw = null; + private Text _colFam = new Text(""); + private Scanner _singleScanner = null; // A scanner for reads/deletes. + private Scanner _scanScanner = null; // A scanner for use by scan() + + private static final String PC_PRODUCER = "producer"; + private static final String PC_CONSUMER = "consumer"; + private String _PC_FLAG = ""; + private ZKProducerConsumer.Queue q = null; + private static Hashtable<String,Long> hmKeyReads = null; + private static Hashtable<String,Integer> hmKeyNumReads = null; + private Random r = null; + + + @Override + public void init() throws DBException { + _colFam = new Text(getProperties().getProperty("accumulo.columnFamily")); + + _inst = new ZooKeeperInstance(getProperties().getProperty("accumulo.instanceName"), + getProperties().getProperty("accumulo.zooKeepers")); + try { + String principal = getProperties().getProperty("accumulo.username"); + AuthenticationToken token = new PasswordToken(getProperties().getProperty("accumulo.password")); + _connector = _inst.getConnector(principal, token); + } catch (AccumuloException e) { + throw new DBException(e); + } catch (AccumuloSecurityException e) { + throw new DBException(e); + } + + _PC_FLAG = getProperties().getProperty("accumulo.PC_FLAG","none"); + if (_PC_FLAG.equals(PC_PRODUCER) || _PC_FLAG.equals(PC_CONSUMER)) { + System.out.println("*** YCSB Client is "+_PC_FLAG); + String address = getProperties().getProperty("accumulo.PC_SERVER"); + String root = getProperties().getProperty("accumulo.PC_ROOT_IN_ZK"); + System.out.println("*** PC_INFO(server:"+address+";root="+root+")"); + q = new ZKProducerConsumer.Queue(address, root); + r = new Random(); + } + + if (_PC_FLAG.equals(PC_CONSUMER)) { + hmKeyReads = new Hashtable<String,Long>(); + hmKeyNumReads = new Hashtable<String,Integer>(); + keyNotification(null); + } + } + + + @Override + public void cleanup() throws DBException + { + try { + if (_bw != null) { + _bw.close(); + } + } catch (MutationsRejectedException e) { + throw new DBException(e); + } + CleanUp.shutdownNow(); + } + + /** + * Commonly repeated functionality: Before doing any operation, make sure + * we're working on the correct table. If not, open the correct one. + * + * @param table + */ + public void checkTable(String table) throws TableNotFoundException { + if (!_table.equals(table)) { + getTable(table); + } + } + + /** + * Called when the user specifies a table that isn't the same as the + * existing table. Connect to it and if necessary, close our current + * connection. + * + * @param table + */ + public void getTable(String table) throws TableNotFoundException { + if (_bw != null) { // Close the existing writer if necessary. + try { + _bw.close(); + } catch (MutationsRejectedException e) { + // Couldn't spit out the mutations we wanted. + // Ignore this for now. + } + } + + BatchWriterConfig bwc = new BatchWriterConfig(); + bwc.setMaxLatency(Long.parseLong(getProperties().getProperty("accumulo.batchWriterMaxLatency", "30000")), TimeUnit.MILLISECONDS); + bwc.setMaxMemory(Long.parseLong(getProperties().getProperty("accumulo.batchWriterSize", "100000"))); + bwc.setMaxWriteThreads(Integer.parseInt(getProperties().getProperty("accumulo.batchWriterThreads", "1"))); + + _bw = _connector.createBatchWriter(table, bwc); + + // Create our scanners + _singleScanner = _connector.createScanner(table, Authorizations.EMPTY); + _scanScanner = _connector.createScanner(table, Authorizations.EMPTY); + + _table = table; // Store the name of the table we have open. + } + + /** + * Gets a scanner from Accumulo over one row + * + * @param row the row to scan + * @param fields the set of columns to scan + * @return an Accumulo {@link Scanner} bound to the given row and columns + */ + private Scanner getRow(Text row, Set<String> fields) + { + _singleScanner.clearColumns(); + _singleScanner.setRange(new Range(row)); + if (fields != null) { + for(String field:fields) + { + _singleScanner.fetchColumn(_colFam, new Text(field)); + } + } + return _singleScanner; + } + + @Override + public int read(String table, String key, Set<String> fields, + HashMap<String, ByteIterator> result) { + + try { + checkTable(table); + } catch (TableNotFoundException e) { + System.err.println("Error trying to connect to Accumulo table." + e); + return ServerError; + } + + try { + // Pick out the results we care about. + for (Entry<Key, Value> entry : getRow(new Text(key), null)) { + Value v = entry.getValue(); + byte[] buf = v.get(); + result.put(entry.getKey().getColumnQualifier().toString(), + new ByteArrayByteIterator(buf)); + } + } catch (Exception e) { + System.err.println("Error trying to reading Accumulo table" + key + e); + return ServerError; + } + return Ok; + + } + + @Override + public int scan(String table, String startkey, int recordcount, + Set<String> fields, Vector<HashMap<String, ByteIterator>> result) { + try { + checkTable(table); + } catch (TableNotFoundException e) { + System.err.println("Error trying to connect to Accumulo table." + e); + return ServerError; + } + + // There doesn't appear to be a way to create a range for a given + // LENGTH. Just start and end keys. So we'll do this the hard way for now: + // Just make the end 'infinity' and only read as much as we need. + _scanScanner.clearColumns(); + _scanScanner.setRange(new Range(new Text(startkey), null)); + + // Batch size is how many key/values to try to get per call. Here, I'm + // guessing that the number of keys in a row is equal to the number of fields + // we're interested in. + // We try to fetch one more so as to tell when we've run out of fields. + + if (fields != null) { + // And add each of them as fields we want. + for(String field:fields) + { + _scanScanner.fetchColumn(_colFam, new Text(field)); + } + } else { + // If no fields are provided, we assume one column/row. + } + + String rowKey = ""; + HashMap<String, ByteIterator> currentHM = null; + int count = 0; + + // Begin the iteration. + for (Entry<Key, Value> entry : _scanScanner) { + // Check for a new row. + if (!rowKey.equals(entry.getKey().getRow().toString())) { + if (count++ == recordcount) { // Done reading the last row. + break; + } + rowKey = entry.getKey().getRow().toString(); + if (fields != null) { + // Initial Capacity for all keys. + currentHM = new HashMap<String, ByteIterator>(fields.size()); + } + else + { + // An empty result map. + currentHM = new HashMap<String, ByteIterator>(); + } + result.add(currentHM); + } + // Now add the key to the hashmap. + Value v = entry.getValue(); + byte[] buf = v.get(); + currentHM.put(entry.getKey().getColumnQualifier().toString(), new ByteArrayByteIterator(buf)); + } + + return Ok; + + } + + @Override + public int update(String table, String key, HashMap<String, ByteIterator> values) { + try { + checkTable(table); + } catch (TableNotFoundException e) { + System.err.println("Error trying to connect to Accumulo table." + e); + return ServerError; + } + + Mutation mutInsert = new Mutation(new Text(key)); + for (Map.Entry<String, ByteIterator> entry : values.entrySet()) { + mutInsert.put(_colFam, new Text(entry.getKey()), System + .currentTimeMillis(), + new Value(entry.getValue().toArray())); + } + + try { + _bw.addMutation(mutInsert); + // Distributed YCSB co-ordination: YCSB on a client produces the key to + // be stored in the shared queue in ZooKeeper. + if (_PC_FLAG.equals(PC_PRODUCER)) { + if (r.nextFloat() < 0.01) + keyNotification(key); + } + } catch (MutationsRejectedException e) { + System.err.println("Error performing update."); + e.printStackTrace(); + return ServerError; + } + + + return Ok; + } + + @Override + public int insert(String table, String key, HashMap<String, ByteIterator> values) { + return update(table, key, values); + } + + @Override + public int delete(String table, String key) { + try { + checkTable(table); + } catch (TableNotFoundException e) { + System.err.println("Error trying to connect to Accumulo table." + e); + return ServerError; + } + + try { + deleteRow(new Text(key)); + } catch (RuntimeException e) { + System.err.println("Error performing delete."); + e.printStackTrace(); + return ServerError; + } + + return Ok; + } + + // These functions are adapted from RowOperations.java: + private void deleteRow(Text row) { + deleteRow(getRow(row, null)); + } + + + /** + * Deletes a row, given a Scanner of JUST that row + * + */ + private void deleteRow(Scanner scanner) { + Mutation deleter = null; + // iterate through the keys + for (Entry<Key,Value> entry : scanner) { + // create a mutation for the row + if (deleter == null) + deleter = new Mutation(entry.getKey().getRow()); + // the remove function adds the key with the delete flag set to true + deleter.putDelete(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier()); + } + try { + _bw.addMutation(deleter); + } catch (MutationsRejectedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + } + + private void keyNotification(String key) { + + if (_PC_FLAG.equals(PC_PRODUCER)) { + try { + q.produce(key); + } catch (KeeperException e) { + + } catch (InterruptedException e) { + + } + } else { + //XXX: do something better to keep the loop going (while??) + for (int i = 0; i < 10000000; i++) { + try { + String strKey = q.consume(); + + if ((hmKeyReads.containsKey(strKey) == false) && + (hmKeyNumReads.containsKey(strKey) == false)) { + hmKeyReads.put(strKey, new Long(System.currentTimeMillis())); + hmKeyNumReads.put(strKey, new Integer(1)); + } + + //YCSB Consumer will read the key that was fetched from the + //queue in ZooKeeper. + //(current way is kind of ugly but works, i think) + //TODO : Get table name from configuration or argument + String table = "usertable"; + HashSet<String> fields = new HashSet<String>(); + for (int j=0; j<9; j++) + fields.add("field"+j); + HashMap<String,ByteIterator> result = new HashMap<String,ByteIterator>(); + + int retval = read(table, strKey, fields, result); + //If the results are empty, the key is enqueued in Zookeeper + //and tried again, until the results are found. + if (result.size() == 0) { + q.produce(strKey); + int count = ((Integer)hmKeyNumReads.get(strKey)).intValue(); + hmKeyNumReads.put(strKey, new Integer(count+1)); + } + else { + if (((Integer)hmKeyNumReads.get(strKey)).intValue() > 1) { + long currTime = System.currentTimeMillis(); + long writeTime = ((Long)hmKeyReads.get(strKey)).longValue(); + System.out.println("Key="+strKey+ + //";StartSearch="+writeTime+ + //";EndSearch="+currTime+ + ";TimeLag="+(currTime-writeTime)); + } + } + + } catch (KeeperException e) { + + } catch (InterruptedException e) { + + } + } + } + + } + + public int presplit(String table, String[] keys) + { + TreeSet<Text> splits = new TreeSet<Text>(); + for (int i = 0;i < keys.length; i ++) + { + splits.add(new Text(keys[i])); + } + try { + _connector.tableOperations().addSplits(table, splits); + } catch (TableNotFoundException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (AccumuloException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (AccumuloSecurityException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + return Ok; + } + +} diff --git a/accumulo/src/main/java/com/yahoo/ycsb/db/ZKProducerConsumer.java b/accumulo/src/main/java/com/yahoo/ycsb/db/ZKProducerConsumer.java new file mode 100644 index 0000000000000000000000000000000000000000..2daec51939a6e270914bb31eb204f268b86a98ae --- /dev/null +++ b/accumulo/src/main/java/com/yahoo/ycsb/db/ZKProducerConsumer.java @@ -0,0 +1,122 @@ +package com.yahoo.ycsb.db; + +import java.io.IOException; +import java.util.List; + + +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.data.Stat; + +// Implementing the PC Queue in ZooKeeper +// +public class ZKProducerConsumer implements Watcher { + + static ZooKeeper zk = null; + static Integer mutex; + + String root; + + // Constructor that takes tha address of the ZK server + // + ZKProducerConsumer(String address) { + if(zk == null){ + try { + System.out.println("Starting ZK:"); + zk = new ZooKeeper(address, 3000, this); + mutex = new Integer(-1); + System.out.println("Finished starting ZK: " + zk); + } catch (IOException e) { + System.out.println(e.toString()); + zk = null; + } + } + //else mutex = new Integer(-1); + } + + synchronized public void process(WatchedEvent event) { + synchronized (mutex) { + //System.out.println("Process: " + event.getType()); + mutex.notify(); + } + } + + + static public class QueueElement { + public String key; + public long writeTime; + + QueueElement(String key, long writeTime) { + this.key = key; + this.writeTime = writeTime; + } + } + + // Producer-Consumer queue + static public class Queue extends ZKProducerConsumer { + + // Constructor of producer-consumer queue + Queue(String address, String name) { + super(address); + this.root = name; + // Create ZK node name + if (zk != null) { + try { + Stat s = zk.exists(root, false); + if (s == null) { + zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + } + } catch (KeeperException e) { + System.out + .println("Keeper exception when instantiating queue: " + + e.toString()); + } catch (InterruptedException e) { + System.out.println("Interrupted exception"); + } + } + } + + // Producer calls this method to insert the key in the queue + // + boolean produce(String key) throws KeeperException, InterruptedException{ + byte[] value; + value = key.getBytes(); + zk.create(root + "/key", value, + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); + + return true; + } + + // Consumer calls this method to "wait" for the key to the available + // + String consume() throws KeeperException, InterruptedException { + String retvalue = null; + Stat stat = null; + + // Get the first element available + while (true) { + synchronized (mutex) { + List<String> list = zk.getChildren(root, true); + if (list.size() == 0) { + System.out.println("Going to wait"); + mutex.wait(); + } else { + String path = root+"/"+list.get(0); + byte[] b = zk.getData(path, false, stat); + retvalue = new String(b); + zk.delete(path, -1); + + return retvalue; + + } + } + } + } + } +} + diff --git a/bin/ycsb b/bin/ycsb index 7323a413f669b51a1cf68cfc516e50931237aea7..9be344b6aea14f86da83a2e2a4b04d9ab44dffec 100755 --- a/bin/ycsb +++ b/bin/ycsb @@ -24,6 +24,7 @@ COMMANDS = { } DATABASES = { + "accumulo" : "com.yahoo.ycsb.db.AccumuloClient", "basic" : "com.yahoo.ycsb.BasicDB", "cassandra-7" : "com.yahoo.ycsb.db.CassandraClient7", "cassandra-8" : "com.yahoo.ycsb.db.CassandraClient8", diff --git a/pom.xml b/pom.xml index cfc2f938c8e0ffd122cf970e961b1ff6736af2fe..287c717823c0097da8d45eecf42211653c3b82f1 100644 --- a/pom.xml +++ b/pom.xml @@ -45,6 +45,7 @@ <properties> <maven.assembly.version>2.2.1</maven.assembly.version> <hbase.version>0.92.1</hbase.version> + <accumulo.version>1.6.0</accumulo.version> <cassandra.version>0.7.0</cassandra.version> <infinispan.version>7.1.0.CR1</infinispan.version> <openjpa.jdbc.version>2.1.1</openjpa.jdbc.version> @@ -64,6 +65,7 @@ <module>core</module> <module>hbase</module> <module>hypertable</module> + <module>accumulo</module> <module>dynamodb</module> <module>elasticsearch</module> <!--<module>gemfire</module>-->