Skip to content
Snippets Groups Projects
Commit 8e001060 authored by Robert J. Moore's avatar Robert J. Moore
Browse files

Optimizations.

parent 7e0f6320
No related branches found
No related tags found
No related merge requests found
...@@ -6,6 +6,8 @@ ...@@ -6,6 +6,8 @@
package com.yahoo.ycsb.db; package com.yahoo.ycsb.db;
import static com.allanbank.mongodb.builder.QueryBuilder.where;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
...@@ -14,22 +16,21 @@ import java.util.Set; ...@@ -14,22 +16,21 @@ import java.util.Set;
import java.util.Vector; import java.util.Vector;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import com.allanbank.mongodb.MongoIterator;
import com.allanbank.mongodb.Durability; import com.allanbank.mongodb.Durability;
import com.allanbank.mongodb.LockType; import com.allanbank.mongodb.LockType;
import com.allanbank.mongodb.Mongo; import com.allanbank.mongodb.MongoClient;
import com.allanbank.mongodb.MongoCollection; import com.allanbank.mongodb.MongoCollection;
import com.allanbank.mongodb.MongoDatabase; import com.allanbank.mongodb.MongoDatabase;
import com.allanbank.mongodb.MongoDbUri; import com.allanbank.mongodb.MongoDbUri;
import com.allanbank.mongodb.MongoFactory; import com.allanbank.mongodb.MongoFactory;
import com.allanbank.mongodb.MongoIterator;
import com.allanbank.mongodb.bson.Document; import com.allanbank.mongodb.bson.Document;
import com.allanbank.mongodb.bson.Element; import com.allanbank.mongodb.bson.Element;
import com.allanbank.mongodb.bson.ElementType;
import com.allanbank.mongodb.bson.builder.BuilderFactory; import com.allanbank.mongodb.bson.builder.BuilderFactory;
import com.allanbank.mongodb.bson.builder.DocumentBuilder; import com.allanbank.mongodb.bson.builder.DocumentBuilder;
import com.allanbank.mongodb.bson.element.BinaryElement; import com.allanbank.mongodb.bson.element.BinaryElement;
import com.allanbank.mongodb.builder.Find; import com.allanbank.mongodb.builder.Find;
import com.allanbank.mongodb.builder.QueryBuilder;
import com.yahoo.ycsb.ByteArrayByteIterator;
import com.yahoo.ycsb.ByteIterator; import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.DB; import com.yahoo.ycsb.DB;
import com.yahoo.ycsb.DBException; import com.yahoo.ycsb.DBException;
...@@ -51,7 +52,7 @@ public class AsyncMongoDbClient extends DB { ...@@ -51,7 +52,7 @@ public class AsyncMongoDbClient extends DB {
private static String database; private static String database;
/** The connection to MongoDB. */ /** The connection to MongoDB. */
private static Mongo mongo; private static MongoClient mongo;
/** The database to MongoDB. */ /** The database to MongoDB. */
private MongoDatabase db; private MongoDatabase db;
...@@ -111,15 +112,28 @@ public class AsyncMongoDbClient extends DB { ...@@ -111,15 +112,28 @@ public class AsyncMongoDbClient extends DB {
*/ */
@Override @Override
public final void init() throws DBException { public final void init() throws DBException {
initCount.incrementAndGet(); int count = initCount.incrementAndGet();
final Properties props = getProperties();
final String maxConnections = props.getProperty(
"mongodb.maxconnections", "10");
final int connections = Integer.parseInt(maxConnections);
synchronized (AsyncMongoDbClient.class) { synchronized (AsyncMongoDbClient.class) {
if (mongo != null) { if (mongo != null) {
db = mongo.getDatabase(database); db = mongo.getDatabase(database);
// If there are more threads (count) than connections then the
// Low latency spin lock is not really needed as we will keep
// the connections occupied.
if (count > connections) {
mongo.getConfig().setLockType(LockType.MUTEX);
}
return; return;
} }
// initialize MongoDb driver // initialize MongoDb driver
final Properties props = getProperties();
String url = props.getProperty("mongodb.url", String url = props.getProperty("mongodb.url",
"mongodb://localhost:27017"); "mongodb://localhost:27017");
database = props.getProperty("mongodb.database", "ycsb"); database = props.getProperty("mongodb.database", "ycsb");
...@@ -127,8 +141,6 @@ public class AsyncMongoDbClient extends DB { ...@@ -127,8 +141,6 @@ public class AsyncMongoDbClient extends DB {
"mongodb.writeConcern", "mongodb.writeConcern",
props.getProperty("mongodb.durability", "safe")) props.getProperty("mongodb.durability", "safe"))
.toLowerCase(); .toLowerCase();
final String maxConnections = props.getProperty(
"mongodb.maxconnections", "10");
if ("none".equals(writeConcernType)) { if ("none".equals(writeConcernType)) {
writeConcern = Durability.NONE; writeConcern = Durability.NONE;
...@@ -158,10 +170,9 @@ public class AsyncMongoDbClient extends DB { ...@@ -158,10 +170,9 @@ public class AsyncMongoDbClient extends DB {
// need to append db to url. // need to append db to url.
url += "/" + database; url += "/" + database;
System.out.println("new database url = " + url); System.out.println("new database url = " + url);
mongo = MongoFactory.create(new MongoDbUri(url)); mongo = MongoFactory.createClient(new MongoDbUri(url));
mongo.getConfig().setMaxConnectionCount( mongo.getConfig().setMaxConnectionCount(connections);
Integer.parseInt(maxConnections)); mongo.getConfig().setLockType(LockType.LOW_LATENCY_SPIN); // assumed...
mongo.getConfig().setLockType(LockType.LOW_LATENCY_SPIN);
db = mongo.getDatabase(database); db = mongo.getDatabase(database);
System.out.println("mongo connection created with " + url); System.out.println("mongo connection created with " + url);
...@@ -195,7 +206,8 @@ public class AsyncMongoDbClient extends DB { ...@@ -195,7 +206,8 @@ public class AsyncMongoDbClient extends DB {
final HashMap<String, ByteIterator> values) { final HashMap<String, ByteIterator> values) {
try { try {
final MongoCollection collection = db.getCollection(table); final MongoCollection collection = db.getCollection(table);
final DocumentBuilder r = BuilderFactory.start().add("_id", key); final DocumentBuilder r = DOCUMENT_BUILDER.get().reset()
.add("_id", key);
for (final Map.Entry<String, ByteIterator> entry : values for (final Map.Entry<String, ByteIterator> entry : values
.entrySet()) { .entrySet()) {
r.add(entry.getKey(), entry.getValue().toArray()); r.add(entry.getKey(), entry.getValue().toArray());
...@@ -209,6 +221,14 @@ public class AsyncMongoDbClient extends DB { ...@@ -209,6 +221,14 @@ public class AsyncMongoDbClient extends DB {
} }
} }
/** Thread local document builder. */
private static final ThreadLocal<DocumentBuilder> DOCUMENT_BUILDER = new ThreadLocal<DocumentBuilder>() {
@Override
protected DocumentBuilder initialValue() {
return BuilderFactory.start();
}
};
/** /**
* Read a record from the database. Each field/value pair from the result * Read a record from the database. Each field/value pair from the result
* will be stored in a HashMap. * will be stored in a HashMap.
...@@ -228,7 +248,7 @@ public class AsyncMongoDbClient extends DB { ...@@ -228,7 +248,7 @@ public class AsyncMongoDbClient extends DB {
final Set<String> fields, final HashMap<String, ByteIterator> result) { final Set<String> fields, final HashMap<String, ByteIterator> result) {
try { try {
final MongoCollection collection = db.getCollection(table); final MongoCollection collection = db.getCollection(table);
final Document q = BuilderFactory.start().add("_id", key).build(); final DocumentBuilder q = BuilderFactory.start().add("_id", key);
final DocumentBuilder fieldsToReturn = BuilderFactory.start(); final DocumentBuilder fieldsToReturn = BuilderFactory.start();
Document queryResult = null; Document queryResult = null;
...@@ -239,12 +259,11 @@ public class AsyncMongoDbClient extends DB { ...@@ -239,12 +259,11 @@ public class AsyncMongoDbClient extends DB {
} }
final Find.Builder fb = new Find.Builder(q); final Find.Builder fb = new Find.Builder(q);
fb.setReturnFields(fieldsToReturn); fb.projection(fieldsToReturn);
fb.setLimit(1); fb.setLimit(1);
fb.setBatchSize(1); fb.setBatchSize(1);
final MongoIterator<Document> ci = collection.find(fb final MongoIterator<Document> ci = collection.find(fb.build());
.build());
if (ci.hasNext()) { if (ci.hasNext()) {
queryResult = ci.next(); queryResult = ci.next();
ci.close(); ci.close();
...@@ -291,10 +310,9 @@ public class AsyncMongoDbClient extends DB { ...@@ -291,10 +310,9 @@ public class AsyncMongoDbClient extends DB {
try { try {
final MongoCollection collection = db.getCollection(table); final MongoCollection collection = db.getCollection(table);
// { "_id":{"$gte":startKey, "$lte":{"appId":key+"\uFFFF"}} } // { "_id":{"$gte":startKey}} }
final Find.Builder fb = new Find.Builder(); final Find.Builder fb = new Find.Builder();
fb.setQuery(QueryBuilder.where("_id") fb.setQuery(where("_id").greaterThanOrEqualTo(startkey));
.greaterThanOrEqualTo(startkey));
fb.setLimit(recordcount); fb.setLimit(recordcount);
fb.setBatchSize(recordcount); fb.setBatchSize(recordcount);
if (fields != null) { if (fields != null) {
...@@ -303,12 +321,11 @@ public class AsyncMongoDbClient extends DB { ...@@ -303,12 +321,11 @@ public class AsyncMongoDbClient extends DB {
fieldsDoc.add(field, 1); fieldsDoc.add(field, 1);
} }
fb.setReturnFields(fieldsDoc); fb.projection(fieldsDoc);
} }
result.ensureCapacity(recordcount); result.ensureCapacity(recordcount);
final MongoIterator<Document> cursor = collection.find(fb final MongoIterator<Document> cursor = collection.find(fb.build());
.build());
while (cursor.hasNext()) { while (cursor.hasNext()) {
// toMap() returns a Map but result.add() expects a // toMap() returns a Map but result.add() expects a
// Map<String,String>. Hence, the suppress warnings. // Map<String,String>. Hence, the suppress warnings.
...@@ -347,7 +364,7 @@ public class AsyncMongoDbClient extends DB { ...@@ -347,7 +364,7 @@ public class AsyncMongoDbClient extends DB {
final HashMap<String, ByteIterator> values) { final HashMap<String, ByteIterator> values) {
try { try {
final MongoCollection collection = db.getCollection(table); final MongoCollection collection = db.getCollection(table);
final Document q = BuilderFactory.start().add("_id", key).build(); final DocumentBuilder q = BuilderFactory.start().add("_id", key);
final DocumentBuilder u = BuilderFactory.start(); final DocumentBuilder u = BuilderFactory.start();
final DocumentBuilder fieldsToSet = u.push("$set"); final DocumentBuilder fieldsToSet = u.push("$set");
for (final Map.Entry<String, ByteIterator> entry : values for (final Map.Entry<String, ByteIterator> entry : values
...@@ -375,10 +392,73 @@ public class AsyncMongoDbClient extends DB { ...@@ -375,10 +392,73 @@ public class AsyncMongoDbClient extends DB {
protected final void fillMap(final HashMap<String, ByteIterator> result, protected final void fillMap(final HashMap<String, ByteIterator> result,
final Document queryResult) { final Document queryResult) {
for (final Element be : queryResult) { for (final Element be : queryResult) {
if (be instanceof BinaryElement) { if (be.getType() == ElementType.BINARY) {
result.put(be.getName(), new ByteArrayByteIterator( result.put(be.getName(), new BinaryByteArrayIterator(
((BinaryElement) be).getValue())); (BinaryElement) be));
} }
} }
} }
/**
* BinaryByteArrayIterator provides an adapter from a {@link BinaryElement}
* to a {@link ByteIterator}.
*
* @copyright 2013, Allanbank Consulting, Inc., All Rights Reserved
*/
private final static class BinaryByteArrayIterator extends ByteIterator {
/** The binary data. */
private final BinaryElement binaryElement;
/** The current offset into the binary element. */
private int offset;
/**
* Creates a new BinaryByteArrayIterator.
*
* @param element
* The {@link BinaryElement} to iterate over.
*/
public BinaryByteArrayIterator(BinaryElement element) {
this.binaryElement = element;
this.offset = 0;
}
/**
* {@inheritDoc}
* <p>
* Overridden to return true if there is more data in the
* {@link BinaryElement}.
* </p>
*/
@Override
public boolean hasNext() {
return (offset < binaryElement.length());
}
/**
* {@inheritDoc}
* <p>
* Overridden to return the myNext value and advance the iterator.
* </p>
*/
@Override
public byte nextByte() {
byte value = binaryElement.get(offset);
offset += 1;
return value;
}
/**
* {@inheritDoc}
* <p>
* Overridden to return the number of bytes remaining in the iterator.
* </p>
*/
@Override
public long bytesLeft() {
return Math.max(0, binaryElement.length() - offset);
}
}
} }
...@@ -9,14 +9,29 @@ ...@@ -9,14 +9,29 @@
package com.yahoo.ycsb.db; package com.yahoo.ycsb.db;
import com.mongodb.*; import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.Vector;
import java.util.concurrent.atomic.AtomicInteger;
import com.mongodb.BasicDBObject;
import com.mongodb.DBAddress;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientOptions;
import com.mongodb.WriteConcern;
import com.mongodb.WriteResult;
import com.yahoo.ycsb.ByteArrayByteIterator; import com.yahoo.ycsb.ByteArrayByteIterator;
import com.yahoo.ycsb.ByteIterator; import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.DB; import com.yahoo.ycsb.DB;
import com.yahoo.ycsb.DBException; import com.yahoo.ycsb.DBException;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* MongoDB client for YCSB framework. * MongoDB client for YCSB framework.
...@@ -34,7 +49,7 @@ public class MongoDbClient extends DB { ...@@ -34,7 +49,7 @@ public class MongoDbClient extends DB {
protected static final Integer INCLUDE = Integer.valueOf(1); protected static final Integer INCLUDE = Integer.valueOf(1);
/** A singleton Mongo instance. */ /** A singleton Mongo instance. */
private static Mongo mongo; private static MongoClient mongo;
/** The default write concern for the test. */ /** The default write concern for the test. */
private static WriteConcern writeConcern; private static WriteConcern writeConcern;
...@@ -42,12 +57,15 @@ public class MongoDbClient extends DB { ...@@ -42,12 +57,15 @@ public class MongoDbClient extends DB {
/** The database to access. */ /** The database to access. */
private static String database; private static String database;
/** Count the number of times initialized to teardown on the last {@link #cleanup()}. */ /**
* Count the number of times initialized to teardown on the last
* {@link #cleanup()}.
*/
private static final AtomicInteger initCount = new AtomicInteger(0); private static final AtomicInteger initCount = new AtomicInteger(0);
/** /**
* Initialize any state for this DB. * Initialize any state for this DB. Called once per DB instance; there is
* Called once per DB instance; there is one DB instance per client thread. * one DB instance per client thread.
*/ */
@Override @Override
public void init() throws DBException { public void init() throws DBException {
...@@ -75,6 +93,10 @@ public class MongoDbClient extends DB { ...@@ -75,6 +93,10 @@ public class MongoDbClient extends DB {
"safe").toLowerCase(); "safe").toLowerCase();
final String maxConnections = props.getProperty( final String maxConnections = props.getProperty(
"mongodb.maxconnections", "10"); "mongodb.maxconnections", "10");
final String threadsAllowedToBlockForConnectionMultiplier = props
.getProperty(
"mongodb.threadsAllowedToBlockForConnectionMultiplier",
"5");
if ("none".equals(writeConcernType)) { if ("none".equals(writeConcernType)) {
writeConcern = WriteConcern.NONE; writeConcern = WriteConcern.NONE;
...@@ -111,9 +133,13 @@ public class MongoDbClient extends DB { ...@@ -111,9 +133,13 @@ public class MongoDbClient extends DB {
// need to append db to url. // need to append db to url.
url += "/" + database; url += "/" + database;
System.out.println("new database url = " + url); System.out.println("new database url = " + url);
MongoOptions options = new MongoOptions(); MongoClientOptions options = MongoClientOptions
options.connectionsPerHost = Integer.parseInt(maxConnections); .builder()
mongo = new Mongo(new DBAddress(url), options); .connectionsPerHost(Integer.parseInt(maxConnections))
.threadsAllowedToBlockForConnectionMultiplier(
Integer.parseInt(threadsAllowedToBlockForConnectionMultiplier))
.build();
mongo = new MongoClient(new DBAddress(url), options);
System.out.println("mongo connection created with " + url); System.out.println("mongo connection created with " + url);
} }
...@@ -128,8 +154,8 @@ public class MongoDbClient extends DB { ...@@ -128,8 +154,8 @@ public class MongoDbClient extends DB {
} }
/** /**
* Cleanup any state for this DB. * Cleanup any state for this DB. Called once per DB instance; there is one
* Called once per DB instance; there is one DB instance per client thread. * DB instance per client thread.
*/ */
@Override @Override
public void cleanup() throws DBException { public void cleanup() throws DBException {
...@@ -148,10 +174,13 @@ public class MongoDbClient extends DB { ...@@ -148,10 +174,13 @@ public class MongoDbClient extends DB {
/** /**
* Delete a record from the database. * Delete a record from the database.
* *
* @param table The name of the table * @param table
* @param key The record key of the record to delete. * The name of the table
* @return Zero on success, a non-zero error code on error. See this class's description for a discussion of error codes. * @param key
* The record key of the record to delete.
* @return Zero on success, a non-zero error code on error. See this class's
* description for a discussion of error codes.
*/ */
@Override @Override
public int delete(String table, String key) { public int delete(String table, String key) {
...@@ -176,13 +205,18 @@ public class MongoDbClient extends DB { ...@@ -176,13 +205,18 @@ public class MongoDbClient extends DB {
} }
/** /**
* Insert a record in the database. Any field/value pairs in the specified values HashMap will be written into the record with the specified * Insert a record in the database. Any field/value pairs in the specified
* record key. * values HashMap will be written into the record with the specified record
* * key.
* @param table The name of the table *
* @param key The record key of the record to insert. * @param table
* @param values A HashMap of field/value pairs to insert in the record * The name of the table
* @return Zero on success, a non-zero error code on error. See this class's description for a discussion of error codes. * @param key
* The record key of the record to insert.
* @param values
* A HashMap of field/value pairs to insert in the record
* @return Zero on success, a non-zero error code on error. See this class's
* description for a discussion of error codes.
*/ */
@Override @Override
public int insert(String table, String key, public int insert(String table, String key,
...@@ -195,8 +229,8 @@ public class MongoDbClient extends DB { ...@@ -195,8 +229,8 @@ public class MongoDbClient extends DB {
DBCollection collection = db.getCollection(table); DBCollection collection = db.getCollection(table);
DBObject r = new BasicDBObject().append("_id", key); DBObject r = new BasicDBObject().append("_id", key);
for (String k : values.keySet()) { for (Map.Entry<String, ByteIterator> entry : values.entrySet()) {
r.put(k, values.get(k).toArray()); r.put(entry.getKey(), entry.getValue().toArray());
} }
WriteResult res = collection.insert(r, writeConcern); WriteResult res = collection.insert(r, writeConcern);
return res.getError() == null ? 0 : 1; return res.getError() == null ? 0 : 1;
...@@ -213,12 +247,17 @@ public class MongoDbClient extends DB { ...@@ -213,12 +247,17 @@ public class MongoDbClient extends DB {
} }
/** /**
* Read a record from the database. Each field/value pair from the result will be stored in a HashMap. * Read a record from the database. Each field/value pair from the result
* * will be stored in a HashMap.
* @param table The name of the table *
* @param key The record key of the record to read. * @param table
* @param fields The list of fields to read, or null for all of them * The name of the table
* @param result A HashMap of field/value pairs for the result * @param key
* The record key of the record to read.
* @param fields
* The list of fields to read, or null for all of them
* @param result
* A HashMap of field/value pairs for the result
* @return Zero on success, a non-zero error code on error or "not found". * @return Zero on success, a non-zero error code on error or "not found".
*/ */
@Override @Override
...@@ -264,13 +303,18 @@ public class MongoDbClient extends DB { ...@@ -264,13 +303,18 @@ public class MongoDbClient extends DB {
} }
/** /**
* Update a record in the database. Any field/value pairs in the specified values HashMap will be written into the record with the specified * Update a record in the database. Any field/value pairs in the specified
* record key, overwriting any existing values with the same field name. * values HashMap will be written into the record with the specified record
* * key, overwriting any existing values with the same field name.
* @param table The name of the table *
* @param key The record key of the record to write. * @param table
* @param values A HashMap of field/value pairs to update in the record * The name of the table
* @return Zero on success, a non-zero error code on error. See this class's description for a discussion of error codes. * @param key
* The record key of the record to write.
* @param values
* A HashMap of field/value pairs to update in the record
* @return Zero on success, a non-zero error code on error. See this class's
* description for a discussion of error codes.
*/ */
@Override @Override
public int update(String table, String key, public int update(String table, String key,
...@@ -308,14 +352,22 @@ public class MongoDbClient extends DB { ...@@ -308,14 +352,22 @@ public class MongoDbClient extends DB {
} }
/** /**
* Perform a range scan for a set of records in the database. Each field/value pair from the result will be stored in a HashMap. * Perform a range scan for a set of records in the database. Each
* * field/value pair from the result will be stored in a HashMap.
* @param table The name of the table *
* @param startkey The record key of the first record to read. * @param table
* @param recordcount The number of records to read * The name of the table
* @param fields The list of fields to read, or null for all of them * @param startkey
* @param result A Vector of HashMaps, where each HashMap is a set field/value pairs for one record * The record key of the first record to read.
* @return Zero on success, a non-zero error code on error. See this class's description for a discussion of error codes. * @param recordcount
* The number of records to read
* @param fields
* The list of fields to read, or null for all of them
* @param result
* A Vector of HashMaps, where each HashMap is a set field/value
* pairs for one record
* @return Zero on success, a non-zero error code on error. See this class's
* description for a discussion of error codes.
*/ */
@Override @Override
public int scan(String table, String startkey, int recordcount, public int scan(String table, String startkey, int recordcount,
...@@ -355,10 +407,12 @@ public class MongoDbClient extends DB { ...@@ -355,10 +407,12 @@ public class MongoDbClient extends DB {
} }
/** /**
* TODO - Finish * Fills the map with the values from the DBObject.
* *
* @param resultMap * @param resultMap
* The map to fill/
* @param obj * @param obj
* The object to copy values from.
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
protected void fillMap(HashMap<String, ByteIterator> resultMap, DBObject obj) { protected void fillMap(HashMap<String, ByteIterator> resultMap, DBObject obj) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment