From ff4342d86c76f2d96aed17458b71c504b0bc1568 Mon Sep 17 00:00:00 2001
From: Jonathan M Hsieh <jon@cloudera.com>
Date: Fri, 19 Jul 2013 23:45:36 -0700
Subject: [PATCH] Accumulo db driver for ycsb.

Based on YCSB++

http://www.pdl.cmu.edu/ycsb++/
---
 accumulo/pom.xml                              |  96 ++++
 .../com/yahoo/ycsb/db/AccumuloClient.java     | 437 ++++++++++++++++++
 .../com/yahoo/ycsb/db/ZKProducerConsumer.java | 122 +++++
 bin/ycsb                                      |   1 +
 pom.xml                                       |   2 +
 5 files changed, 658 insertions(+)
 create mode 100644 accumulo/pom.xml
 create mode 100644 accumulo/src/main/java/com/yahoo/ycsb/db/AccumuloClient.java
 create mode 100644 accumulo/src/main/java/com/yahoo/ycsb/db/ZKProducerConsumer.java

diff --git a/accumulo/pom.xml b/accumulo/pom.xml
new file mode 100644
index 00000000..00756707
--- /dev/null
+++ b/accumulo/pom.xml
@@ -0,0 +1,96 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>com.yahoo.ycsb</groupId>
+    <artifactId>root</artifactId>
+    <version>0.1.4</version>
+  </parent>
+  <artifactId>accumulo-binding</artifactId>
+  <name>Accumulo DB Binding</name>
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.accumulo</groupId>
+      <artifactId>accumulo-core</artifactId>
+      <version>${accumulo.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-common</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.thrift</groupId>
+          <artifactId>thrift</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.zookeeper</groupId>
+          <artifactId>zookeeper</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+      <version>3.3.1</version>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <version>1.2.16</version>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>4.8.1</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-core</artifactId>
+      <version>0.20.203.0</version>
+    </dependency>
+    <dependency>
+      <groupId>mysql</groupId>
+      <artifactId>mysql-connector-java</artifactId>
+      <version>5.1.14</version>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>r08</version>
+    </dependency>
+    <dependency>
+      <groupId>com.yahoo.ycsb</groupId>
+      <artifactId>core</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+  </dependencies>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <version>${maven.assembly.version}</version>
+        <configuration>
+          <descriptorRefs>
+            <descriptorRef>jar-with-dependencies</descriptorRef>
+          </descriptorRefs>
+          <appendAssemblyId>false</appendAssemblyId>
+        </configuration>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+  <repositories>
+    <repository>
+      <id>apache</id>
+      <url>http://repository.apache.org/snapshots</url>
+    </repository>
+  </repositories>
+</project>
diff --git a/accumulo/src/main/java/com/yahoo/ycsb/db/AccumuloClient.java b/accumulo/src/main/java/com/yahoo/ycsb/db/AccumuloClient.java
new file mode 100644
index 00000000..46e60b30
--- /dev/null
+++ b/accumulo/src/main/java/com/yahoo/ycsb/db/AccumuloClient.java
@@ -0,0 +1,437 @@
+package com.yahoo.ycsb.db;
+
+import com.yahoo.ycsb.ByteIterator;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Hashtable;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.Vector;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+
+import org.apache.hadoop.io.Text;
+import org.apache.zookeeper.KeeperException;
+
+import com.yahoo.ycsb.DB;
+import com.yahoo.ycsb.DBException;
+import com.yahoo.ycsb.ByteArrayByteIterator;
+
+public class AccumuloClient extends DB {
+	// Error code constants.
+	public static final int Ok = 0;
+	public static final int ServerError = -1;
+	public static final int HttpError = -2;
+	public static final int NoMatchingRecord = -3;
+
+	private Connector _connector;
+	private String _table = "";
+	private BatchWriter _bw = null;
+	private Text _colFam = new Text("");
+	private Scanner _singleScanner = null;    // A scanner for reads/deletes.
+	private Scanner _scanScanner = null;      // A scanner for use by scan()
+
+	private static final String PC_PRODUCER = "producer";
+	private static final String PC_CONSUMER = "consumer";
+	private String _PC_FLAG = "";
+	private ZKProducerConsumer.Queue q = null;
+	private static Hashtable<String,Long> hmKeyReads = null;
+	private static Hashtable<String,Integer> hmKeyNumReads = null;
+	private Random r = null;
+
+
+	@Override
+	public void init() {
+		_colFam = new Text(getProperties().getProperty("accumulo.columnFamily"));
+
+		Instance inst = new ZooKeeperInstance(getProperties().getProperty("accumulo.instanceName"),
+				getProperties().getProperty("accumulo.zooKeepers"));
+		try {
+			_connector = inst.getConnector(getProperties().getProperty("accumulo.username"), 
+					getProperties().getProperty("accumulo.password"));
+		} catch (AccumuloException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		} catch (AccumuloSecurityException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+
+		_PC_FLAG = getProperties().getProperty("accumulo.PC_FLAG","none");
+		if (_PC_FLAG.equals(PC_PRODUCER) || _PC_FLAG.equals(PC_CONSUMER)) {
+			System.out.println("*** YCSB Client is "+_PC_FLAG);
+			String address = getProperties().getProperty("accumulo.PC_SERVER");
+			String root = getProperties().getProperty("accumulo.PC_ROOT_IN_ZK");
+			System.out.println("*** PC_INFO(server:"+address+";root="+root+")");
+			q = new ZKProducerConsumer.Queue(address, root);
+			r = new Random();
+		}
+
+		if (_PC_FLAG.equals(PC_CONSUMER)) {
+			hmKeyReads = new Hashtable<String,Long>();
+			hmKeyNumReads = new Hashtable<String,Integer>();
+			keyNotification(null);
+		}
+	}
+
+
+	public void cleanup() throws DBException
+	{
+		try {
+			if (_bw != null) {
+				try {
+					//Thread.sleep(60000);
+				} catch (Exception e) {
+				}
+				_bw.close();
+			}
+		} catch (MutationsRejectedException e) {
+			throw new DBException(e);
+		}	
+	}
+
+	/**
+	 * Commonly repeated functionality: Before doing any operation, make sure
+	 * we're working on the correct table. If not, open the correct one.
+	 * 
+	 * @param table
+	 */
+	public void checkTable(String table) throws TableNotFoundException {
+		if (!_table.equals(table)) {
+			getTable(table);
+		}
+	}
+
+	/**
+	 * Called when the user specifies a table that isn't the same as the
+	 * existing table. Connect to it and if necessary, close our current
+	 * connection.
+	 * 
+	 * @param table
+	 */
+	public void getTable(String table) throws TableNotFoundException {
+		if (_bw != null) { // Close the existing writer if necessary.
+			try {
+				_bw.close();
+			} catch (MutationsRejectedException e) {
+				// Couldn't spit out the mutations we wanted.
+				// Ignore this for now.
+			}
+		}
+
+		long bwSize = Long.parseLong(getProperties().getProperty("accumulo.batchWriterSize", "100000"));
+		long bwMaxLatency = Long.parseLong(getProperties().getProperty("accumulo.batchWriterMaxLatency", "30000"));
+		_bw = _connector.createBatchWriter(table, bwSize, bwMaxLatency, 1);
+		// Create our scanners
+		_singleScanner = _connector.createScanner(table, Constants.NO_AUTHS);
+		_scanScanner = _connector.createScanner(table, Constants.NO_AUTHS);
+
+		_table = table; // Store the name of the table we have open.
+	}
+
+	/**
+	 * Gets a scanner from Accumulo over one row
+	 * 
+	 * @param row
+	 * @return
+	 */
+	private Scanner getRow(Text row, Set<String> fields) 
+	{
+		_singleScanner.clearColumns();
+		_singleScanner.setRange(new Range(row));
+		if (fields != null) {
+			for(String field:fields)
+			{
+				_singleScanner.fetchColumn(_colFam, new Text(field));
+			}
+		}
+		return _singleScanner;
+	}
+
+	@Override
+	public int read(String table, String key, Set<String> fields,
+			HashMap<String, ByteIterator> result) {
+
+		try {
+			checkTable(table);
+		} catch (TableNotFoundException e) {
+			System.err.println("Error trying to connect to Accumulo table." + e);
+			return ServerError;
+		}
+
+		try {
+			// Pick out the results we care about.
+			for (Entry<Key, Value> entry : getRow(new Text(key), null)) {
+			    Value v = entry.getValue();
+			    byte[] buf = new byte[v.getSize()];
+			    v.copy(buf);
+			    result.put(entry.getKey().getColumnQualifier().toString(), 
+				       new ByteArrayByteIterator(buf));
+			}
+		} catch (Exception e) {
+			System.err.println("Error trying to reading Accumulo table" + key + e);
+			return ServerError;
+		}
+		return 0;
+
+	}
+
+	@Override
+	public int scan(String table, String startkey, int recordcount,
+			Set<String> fields, Vector<HashMap<String, ByteIterator>> result) {
+		try {
+			checkTable(table);
+		} catch (TableNotFoundException e) {
+			System.err.println("Error trying to connect to Accumulo table." + e);
+			return ServerError;
+		}
+
+		// There doesn't appear to be a way to create a range for a given
+		// LENGTH. Just start and end keys. So we'll do this the hard way for now: 
+		// Just make the end 'infinity' and only read as much as we need. 
+		_scanScanner.clearColumns();
+		_scanScanner.setRange(new Range(new Text(startkey), null));
+
+		// Batch size is how many key/values to try to get per call. Here, I'm
+		// guessing that the number of keys in a row is equal to the number of fields 
+		// we're interested in.
+		// We try to fetch one more so as to tell when we've run out of fields.
+
+		if (fields != null) {
+			// And add each of them as fields we want.
+			for(String field:fields)
+			{
+				_scanScanner.fetchColumn(_colFam, new Text(field));
+			}
+		} else {
+			// If no fields are provided, we assume one column/row.
+		}
+
+		String rowKey = "";
+		HashMap<String, ByteIterator> currentHM = null;
+		int count = 0;
+
+		// Begin the iteration.
+		for (Entry<Key, Value> entry : _scanScanner) {
+			// Check for a new row.
+			if (!rowKey.equals(entry.getKey().getRow().toString())) {
+				if (count++ == recordcount) { // Done reading the last row.
+					break;
+				}
+				rowKey = entry.getKey().getRow().toString();
+				if (fields != null) {
+					// Initial Capacity for all keys.
+					currentHM = new HashMap<String, ByteIterator>(fields.size()); 
+				}
+				else
+				{
+					// An empty result map.
+					currentHM = new HashMap<String, ByteIterator>();
+				}
+				result.add(currentHM);
+			}
+			// Now add the key to the hashmap.
+			Value v = entry.getValue();
+			byte[] buf = new byte[v.getSize()];
+			currentHM.put(entry.getKey().getColumnQualifier().toString(), new ByteArrayByteIterator(buf));
+		}
+
+		return 0;
+
+	}
+
+	@Override
+	public int update(String table, String key, HashMap<String, ByteIterator> values) {
+		try {
+			checkTable(table);
+		} catch (TableNotFoundException e) {
+			System.err.println("Error trying to connect to Accumulo table." + e);
+			return ServerError;
+		}
+
+		Mutation mutInsert = new Mutation(new Text(key));
+		for (Map.Entry<String, ByteIterator> entry : values.entrySet()) {
+			mutInsert.put(_colFam, new Text(entry.getKey()), System
+					.currentTimeMillis(),
+					new Value(entry.getValue().toArray()));
+		}
+
+		try {
+			_bw.addMutation(mutInsert);
+			// Distributed YCSB co-ordination: YCSB on a client produces the key to 
+			// be stored in the shared queue in ZooKeeper.
+			if (_PC_FLAG.equals(PC_PRODUCER)) {
+				if (r.nextFloat() < 0.01)
+					keyNotification(key);
+			}
+		} catch (MutationsRejectedException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+
+
+		return 0;
+	}
+
+	@Override
+	public int insert(String table, String key, HashMap<String, ByteIterator> values) {
+		return update(table, key, values);
+	}
+
+	@Override
+	public int delete(String table, String key) {
+		try {
+			checkTable(table);
+		} catch (TableNotFoundException e) {
+			System.err.println("Error trying to connect to Accumulo table." + e);
+			return ServerError;
+		}
+
+
+		try {
+			deleteRow(new Text(key));
+		} catch (SecurityException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		} catch (TableNotFoundException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		} catch (Exception e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+
+
+		return 0;
+	}
+
+	// These functions are adapted from RowOperations.java:
+	private void deleteRow(Text row)  throws TableNotFoundException {
+		deleteRow(getRow(row, null));
+	}
+
+
+	/**
+	 * Deletes a row, given a Scanner of JUST that row
+	 * 
+	 */
+	private void deleteRow(Scanner scanner) {
+		Mutation deleter = null;
+		// iterate through the keys
+		for (Entry<Key,Value> entry : scanner) {
+			// create a mutation for the row
+			if (deleter == null)
+				deleter = new Mutation(entry.getKey().getRow());
+			// the remove function adds the key with the delete flag set to true
+			deleter.putDelete(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier());
+		}
+		try {
+			_bw.addMutation(deleter);
+		} catch (MutationsRejectedException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+
+	}
+
+	private void keyNotification(String key) {
+
+		if (_PC_FLAG.equals(PC_PRODUCER)) {
+			try {
+				q.produce(key);
+			} catch (KeeperException e) {
+
+			} catch (InterruptedException e) {
+
+			}
+		} else {
+			//XXX: do something better to keep the loop going (while??)
+			for (int i = 0; i < 10000000; i++) {
+				try {
+					String strKey = q.consume();
+			
+					if ((hmKeyReads.containsKey(strKey) == false) && 
+							(hmKeyNumReads.containsKey(strKey) == false)) { 
+						hmKeyReads.put(strKey, new Long(System.currentTimeMillis()));
+						hmKeyNumReads.put(strKey, new Integer(1));
+					}
+
+					//YCSB Consumer will read the key that was fetched from the 
+					//queue in ZooKeeper.
+					//(current way is kind of ugly but works, i think)
+					//TODO : Get table name from configuration or argument
+					String table = "usertable";
+					HashSet<String> fields = new HashSet<String>();
+					for (int j=0; j<9; j++)
+						fields.add("field"+j);
+					HashMap<String,ByteIterator> result = new HashMap<String,ByteIterator>();
+
+					int retval = read(table, strKey, fields, result);
+					//If the results are empty, the key is enqueued in Zookeeper
+					//and tried again, until the results are found.
+					if (result.size() == 0) { 
+						q.produce(strKey);
+						int count = ((Integer)hmKeyNumReads.get(strKey)).intValue(); 
+						hmKeyNumReads.put(strKey, new Integer(count+1));
+					}
+					else {
+						if (((Integer)hmKeyNumReads.get(strKey)).intValue() > 1) { 
+							long currTime = System.currentTimeMillis();
+							long writeTime = ((Long)hmKeyReads.get(strKey)).longValue();
+							System.out.println("Key="+strKey+
+									//";StartSearch="+writeTime+
+									//";EndSearch="+currTime+
+									";TimeLag="+(currTime-writeTime));
+						}
+					}
+
+				} catch (KeeperException e) {
+
+				} catch (InterruptedException e) {
+
+				}
+			}
+		}
+
+	}
+
+	public int presplit(String table, String[] keys)
+	{
+		TreeSet<Text> splits = new TreeSet<Text>();
+		for (int i = 0;i < keys.length; i ++)
+		{
+			splits.add(new Text(keys[i]));
+		}
+		try {
+			_connector.tableOperations().addSplits(table, splits);
+		} catch (TableNotFoundException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		} catch (AccumuloException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		} catch (AccumuloSecurityException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		} 
+		return 0;
+	}
+
+}
diff --git a/accumulo/src/main/java/com/yahoo/ycsb/db/ZKProducerConsumer.java b/accumulo/src/main/java/com/yahoo/ycsb/db/ZKProducerConsumer.java
new file mode 100644
index 00000000..2daec519
--- /dev/null
+++ b/accumulo/src/main/java/com/yahoo/ycsb/db/ZKProducerConsumer.java
@@ -0,0 +1,122 @@
+package com.yahoo.ycsb.db;
+
+import java.io.IOException;
+import java.util.List;
+
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+// Implementing the PC Queue in ZooKeeper
+//
+public class ZKProducerConsumer implements Watcher {
+
+    static ZooKeeper zk = null;
+    static Integer mutex;
+
+    String root;
+
+    // Constructor that takes tha address of the ZK server
+    //
+    ZKProducerConsumer(String address) {
+        if(zk == null){
+            try {
+                System.out.println("Starting ZK:");
+                zk = new ZooKeeper(address, 3000, this);
+                mutex = new Integer(-1);
+                System.out.println("Finished starting ZK: " + zk);
+            } catch (IOException e) {
+                System.out.println(e.toString());
+                zk = null;
+            }
+        }
+        //else mutex = new Integer(-1);
+    }
+
+    synchronized public void process(WatchedEvent event) {
+        synchronized (mutex) {
+            //System.out.println("Process: " + event.getType());
+            mutex.notify();
+        }
+    }
+
+
+    static public class QueueElement {
+        public String key;
+        public long writeTime;
+
+        QueueElement(String key, long writeTime) {
+            this.key = key;
+            this.writeTime = writeTime;
+        }
+    }
+
+    // Producer-Consumer queue
+    static public class Queue extends ZKProducerConsumer {
+
+        // Constructor of producer-consumer queue
+        Queue(String address, String name) {
+            super(address);
+            this.root = name;
+            // Create ZK node name
+            if (zk != null) {
+                try {
+                    Stat s = zk.exists(root, false);
+                    if (s == null) {
+                        zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
+                                CreateMode.PERSISTENT);
+                    }
+                } catch (KeeperException e) {
+                    System.out
+                            .println("Keeper exception when instantiating queue: "
+                                    + e.toString());
+                } catch (InterruptedException e) {
+                    System.out.println("Interrupted exception");
+                }
+            }
+        }
+
+        // Producer calls this method to insert the key in the queue
+        //
+        boolean produce(String key) throws KeeperException, InterruptedException{
+            byte[] value;
+            value = key.getBytes();
+            zk.create(root + "/key", value, 
+                      Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
+
+            return true;
+        }
+
+        // Consumer calls this method to "wait" for the key to the available
+        //
+        String consume() throws KeeperException, InterruptedException {
+            String retvalue = null;
+            Stat stat = null;
+
+            // Get the first element available
+            while (true) {
+                synchronized (mutex) {
+                    List<String> list = zk.getChildren(root, true);
+                    if (list.size() == 0) {
+                        System.out.println("Going to wait");
+                        mutex.wait();
+                    } else {
+                        String path = root+"/"+list.get(0);
+                        byte[] b = zk.getData(path, false, stat);
+                        retvalue = new String(b);
+                        zk.delete(path, -1);
+
+                        return retvalue;
+
+                    }
+                }
+            }
+        }
+    }
+}
+        
diff --git a/bin/ycsb b/bin/ycsb
index 7323a413..9be344b6 100755
--- a/bin/ycsb
+++ b/bin/ycsb
@@ -24,6 +24,7 @@ COMMANDS = {
 }
 
 DATABASES = {
+    "accumulo"     : "com.yahoo.ycsb.db.AccumuloClient",
     "basic"        : "com.yahoo.ycsb.BasicDB",
     "cassandra-7"  : "com.yahoo.ycsb.db.CassandraClient7",
     "cassandra-8"  : "com.yahoo.ycsb.db.CassandraClient8",
diff --git a/pom.xml b/pom.xml
index cfc2f938..60417262 100644
--- a/pom.xml
+++ b/pom.xml
@@ -45,6 +45,7 @@
   <properties>
     <maven.assembly.version>2.2.1</maven.assembly.version>
     <hbase.version>0.92.1</hbase.version>
+    <accumulo.version>1.4.3</accumulo.version>
     <cassandra.version>0.7.0</cassandra.version>
     <infinispan.version>7.1.0.CR1</infinispan.version>
     <openjpa.jdbc.version>2.1.1</openjpa.jdbc.version>
@@ -64,6 +65,7 @@
     <module>core</module>
     <module>hbase</module>
     <module>hypertable</module>
+    <module>accumulo</module>
     <module>dynamodb</module>
     <module>elasticsearch</module>
     <!--<module>gemfire</module>-->
-- 
GitLab