From 6e5177fd8b911418342272947e1b9cf8178db241 Mon Sep 17 00:00:00 2001 From: Mike Drob <madrob@cloudera.com> Date: Wed, 5 Feb 2014 12:13:46 -0500 Subject: [PATCH] Cleanup and upgrade to work with Accumulo 1.6.0. * Increment Accumulo Version to 1.6.0 * Fix accumulo dependency versions - Harmonize guava version for Accumulo. - Automatically manage slf4j version. * Clean up depr. code * Clean up return codes and error logging --- accumulo/pom.xml | 12 +-- .../com/yahoo/ycsb/db/AccumuloClient.java | 79 +++++++++---------- pom.xml | 2 +- 3 files changed, 42 insertions(+), 51 deletions(-) diff --git a/accumulo/pom.xml b/accumulo/pom.xml index ffc64112..e74e2c1c 100644 --- a/accumulo/pom.xml +++ b/accumulo/pom.xml @@ -42,6 +42,7 @@ <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.8.1</version> + <scope>test</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> @@ -56,7 +57,7 @@ <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> - <version>r08</version> + <version>15.0</version> </dependency> <dependency> <groupId>com.yahoo.ycsb</groupId> @@ -93,13 +94,4 @@ <url>http://repository.apache.org/snapshots</url> </repository> </repositories> - <dependencyManagement> - <dependencies> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - <version>1.5.8</version> - </dependency> - </dependencies> - </dependencyManagement> </project> diff --git a/accumulo/src/main/java/com/yahoo/ycsb/db/AccumuloClient.java b/accumulo/src/main/java/com/yahoo/ycsb/db/AccumuloClient.java index c397573b..0551849c 100644 --- a/accumulo/src/main/java/com/yahoo/ycsb/db/AccumuloClient.java +++ b/accumulo/src/main/java/com/yahoo/ycsb/db/AccumuloClient.java @@ -1,38 +1,40 @@ package com.yahoo.ycsb.db; -import com.yahoo.ycsb.ByteIterator; - import java.util.HashMap; import java.util.HashSet; import java.util.Hashtable; import java.util.Map; +import java.util.Map.Entry; import java.util.Random; import java.util.Set; import java.util.TreeSet; import java.util.Vector; -import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; -import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.ZooKeeperInstance; -import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.CleanUp; import org.apache.hadoop.io.Text; import org.apache.zookeeper.KeeperException; +import com.yahoo.ycsb.ByteArrayByteIterator; +import com.yahoo.ycsb.ByteIterator; import com.yahoo.ycsb.DB; import com.yahoo.ycsb.DBException; -import com.yahoo.ycsb.ByteArrayByteIterator; public class AccumuloClient extends DB { // Error code constants. @@ -59,20 +61,19 @@ public class AccumuloClient extends DB { @Override - public void init() { + public void init() throws DBException { _colFam = new Text(getProperties().getProperty("accumulo.columnFamily")); _inst = new ZooKeeperInstance(getProperties().getProperty("accumulo.instanceName"), getProperties().getProperty("accumulo.zooKeepers")); try { - _connector = _inst.getConnector(getProperties().getProperty("accumulo.username"), - getProperties().getProperty("accumulo.password")); + String principal = getProperties().getProperty("accumulo.username"); + AuthenticationToken token = new PasswordToken(getProperties().getProperty("accumulo.password")); + _connector = _inst.getConnector(principal, token); } catch (AccumuloException e) { - // TODO Auto-generated catch block - e.printStackTrace(); + throw new DBException(e); } catch (AccumuloSecurityException e) { - // TODO Auto-generated catch block - e.printStackTrace(); + throw new DBException(e); } _PC_FLAG = getProperties().getProperty("accumulo.PC_FLAG","none"); @@ -135,13 +136,16 @@ public class AccumuloClient extends DB { } } - long bwSize = Long.parseLong(getProperties().getProperty("accumulo.batchWriterSize", "100000")); - long bwMaxLatency = Long.parseLong(getProperties().getProperty("accumulo.batchWriterMaxLatency", "30000")); - int bwThreads = Integer.parseInt(getProperties().getProperty("accumulo.batchWriterThreads", "1")); - _bw = _connector.createBatchWriter(table, bwSize, bwMaxLatency, bwThreads); + BatchWriterConfig bwc = new BatchWriterConfig(); + bwc.setMaxLatency(Long.parseLong(getProperties().getProperty("accumulo.batchWriterMaxLatency", "30000")), TimeUnit.MILLISECONDS); + bwc.setMaxMemory(Long.parseLong(getProperties().getProperty("accumulo.batchWriterSize", "100000"))); + bwc.setMaxWriteThreads(Integer.parseInt(getProperties().getProperty("accumulo.batchWriterThreads", "1"))); + + _bw = _connector.createBatchWriter(table, bwc); + // Create our scanners - _singleScanner = _connector.createScanner(table, Constants.NO_AUTHS); - _scanScanner = _connector.createScanner(table, Constants.NO_AUTHS); + _singleScanner = _connector.createScanner(table, Authorizations.EMPTY); + _scanScanner = _connector.createScanner(table, Authorizations.EMPTY); _table = table; // Store the name of the table we have open. } @@ -149,8 +153,9 @@ public class AccumuloClient extends DB { /** * Gets a scanner from Accumulo over one row * - * @param row - * @return + * @param row the row to scan + * @param fields the set of columns to scan + * @return an Accumulo {@link Scanner} bound to the given row and columns */ private Scanner getRow(Text row, Set<String> fields) { @@ -188,7 +193,7 @@ public class AccumuloClient extends DB { System.err.println("Error trying to reading Accumulo table" + key + e); return ServerError; } - return 0; + return Ok; } @@ -252,7 +257,7 @@ public class AccumuloClient extends DB { currentHM.put(entry.getKey().getColumnQualifier().toString(), new ByteArrayByteIterator(buf)); } - return 0; + return Ok; } @@ -281,12 +286,13 @@ public class AccumuloClient extends DB { keyNotification(key); } } catch (MutationsRejectedException e) { - // TODO Auto-generated catch block + System.err.println("Error performing update."); e.printStackTrace(); + return ServerError; } - return 0; + return Ok; } @Override @@ -303,26 +309,19 @@ public class AccumuloClient extends DB { return ServerError; } - try { deleteRow(new Text(key)); - } catch (SecurityException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } catch (TableNotFoundException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } catch (Exception e) { - // TODO Auto-generated catch block + } catch (RuntimeException e) { + System.err.println("Error performing delete."); e.printStackTrace(); + return ServerError; } - - return 0; + return Ok; } // These functions are adapted from RowOperations.java: - private void deleteRow(Text row) throws TableNotFoundException { + private void deleteRow(Text row) { deleteRow(getRow(row, null)); } @@ -430,7 +429,7 @@ public class AccumuloClient extends DB { // TODO Auto-generated catch block e.printStackTrace(); } - return 0; + return Ok; } } diff --git a/pom.xml b/pom.xml index f4ec5e10..287c7178 100644 --- a/pom.xml +++ b/pom.xml @@ -45,7 +45,7 @@ <properties> <maven.assembly.version>2.2.1</maven.assembly.version> <hbase.version>0.92.1</hbase.version> - <accumulo.version>1.4.5</accumulo.version> + <accumulo.version>1.6.0</accumulo.version> <cassandra.version>0.7.0</cassandra.version> <infinispan.version>7.1.0.CR1</infinispan.version> <openjpa.jdbc.version>2.1.1</openjpa.jdbc.version> -- GitLab