diff --git a/mongodb/src/main/java/com/yahoo/ycsb/db/AsyncMongoDbClient.java b/mongodb/src/main/java/com/yahoo/ycsb/db/AsyncMongoDbClient.java index 9b6a9e84a5d54b5b0d15b3eec4b0b57b99898fa5..1b16220ce799e2130f1f3b2af7e2d195ab065a9a 100644 --- a/mongodb/src/main/java/com/yahoo/ycsb/db/AsyncMongoDbClient.java +++ b/mongodb/src/main/java/com/yahoo/ycsb/db/AsyncMongoDbClient.java @@ -6,6 +6,8 @@ package com.yahoo.ycsb.db; +import static com.allanbank.mongodb.builder.QueryBuilder.where; + import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -14,22 +16,21 @@ import java.util.Set; import java.util.Vector; import java.util.concurrent.atomic.AtomicInteger; -import com.allanbank.mongodb.MongoIterator; import com.allanbank.mongodb.Durability; import com.allanbank.mongodb.LockType; -import com.allanbank.mongodb.Mongo; +import com.allanbank.mongodb.MongoClient; import com.allanbank.mongodb.MongoCollection; import com.allanbank.mongodb.MongoDatabase; import com.allanbank.mongodb.MongoDbUri; import com.allanbank.mongodb.MongoFactory; +import com.allanbank.mongodb.MongoIterator; import com.allanbank.mongodb.bson.Document; import com.allanbank.mongodb.bson.Element; +import com.allanbank.mongodb.bson.ElementType; import com.allanbank.mongodb.bson.builder.BuilderFactory; import com.allanbank.mongodb.bson.builder.DocumentBuilder; import com.allanbank.mongodb.bson.element.BinaryElement; import com.allanbank.mongodb.builder.Find; -import com.allanbank.mongodb.builder.QueryBuilder; -import com.yahoo.ycsb.ByteArrayByteIterator; import com.yahoo.ycsb.ByteIterator; import com.yahoo.ycsb.DB; import com.yahoo.ycsb.DBException; @@ -51,7 +52,7 @@ public class AsyncMongoDbClient extends DB { private static String database; /** The connection to MongoDB. */ - private static Mongo mongo; + private static MongoClient mongo; /** The database to MongoDB. */ private MongoDatabase db; @@ -111,15 +112,28 @@ public class AsyncMongoDbClient extends DB { */ @Override public final void init() throws DBException { - initCount.incrementAndGet(); + int count = initCount.incrementAndGet(); + + final Properties props = getProperties(); + final String maxConnections = props.getProperty( + "mongodb.maxconnections", "10"); + final int connections = Integer.parseInt(maxConnections); + synchronized (AsyncMongoDbClient.class) { if (mongo != null) { db = mongo.getDatabase(database); + + // If there are more threads (count) than connections then the + // Low latency spin lock is not really needed as we will keep + // the connections occupied. + if (count > connections) { + mongo.getConfig().setLockType(LockType.MUTEX); + } + return; } // initialize MongoDb driver - final Properties props = getProperties(); String url = props.getProperty("mongodb.url", "mongodb://localhost:27017"); database = props.getProperty("mongodb.database", "ycsb"); @@ -127,8 +141,6 @@ public class AsyncMongoDbClient extends DB { "mongodb.writeConcern", props.getProperty("mongodb.durability", "safe")) .toLowerCase(); - final String maxConnections = props.getProperty( - "mongodb.maxconnections", "10"); if ("none".equals(writeConcernType)) { writeConcern = Durability.NONE; @@ -158,10 +170,9 @@ public class AsyncMongoDbClient extends DB { // need to append db to url. url += "/" + database; System.out.println("new database url = " + url); - mongo = MongoFactory.create(new MongoDbUri(url)); - mongo.getConfig().setMaxConnectionCount( - Integer.parseInt(maxConnections)); - mongo.getConfig().setLockType(LockType.LOW_LATENCY_SPIN); + mongo = MongoFactory.createClient(new MongoDbUri(url)); + mongo.getConfig().setMaxConnectionCount(connections); + mongo.getConfig().setLockType(LockType.LOW_LATENCY_SPIN); // assumed... db = mongo.getDatabase(database); System.out.println("mongo connection created with " + url); @@ -195,7 +206,8 @@ public class AsyncMongoDbClient extends DB { final HashMap<String, ByteIterator> values) { try { final MongoCollection collection = db.getCollection(table); - final DocumentBuilder r = BuilderFactory.start().add("_id", key); + final DocumentBuilder r = DOCUMENT_BUILDER.get().reset() + .add("_id", key); for (final Map.Entry<String, ByteIterator> entry : values .entrySet()) { r.add(entry.getKey(), entry.getValue().toArray()); @@ -209,6 +221,14 @@ public class AsyncMongoDbClient extends DB { } } + /** Thread local document builder. */ + private static final ThreadLocal<DocumentBuilder> DOCUMENT_BUILDER = new ThreadLocal<DocumentBuilder>() { + @Override + protected DocumentBuilder initialValue() { + return BuilderFactory.start(); + } + }; + /** * Read a record from the database. Each field/value pair from the result * will be stored in a HashMap. @@ -228,7 +248,7 @@ public class AsyncMongoDbClient extends DB { final Set<String> fields, final HashMap<String, ByteIterator> result) { try { final MongoCollection collection = db.getCollection(table); - final Document q = BuilderFactory.start().add("_id", key).build(); + final DocumentBuilder q = BuilderFactory.start().add("_id", key); final DocumentBuilder fieldsToReturn = BuilderFactory.start(); Document queryResult = null; @@ -239,12 +259,11 @@ public class AsyncMongoDbClient extends DB { } final Find.Builder fb = new Find.Builder(q); - fb.setReturnFields(fieldsToReturn); + fb.projection(fieldsToReturn); fb.setLimit(1); fb.setBatchSize(1); - final MongoIterator<Document> ci = collection.find(fb - .build()); + final MongoIterator<Document> ci = collection.find(fb.build()); if (ci.hasNext()) { queryResult = ci.next(); ci.close(); @@ -291,10 +310,9 @@ public class AsyncMongoDbClient extends DB { try { final MongoCollection collection = db.getCollection(table); - // { "_id":{"$gte":startKey, "$lte":{"appId":key+"\uFFFF"}} } + // { "_id":{"$gte":startKey}} } final Find.Builder fb = new Find.Builder(); - fb.setQuery(QueryBuilder.where("_id") - .greaterThanOrEqualTo(startkey)); + fb.setQuery(where("_id").greaterThanOrEqualTo(startkey)); fb.setLimit(recordcount); fb.setBatchSize(recordcount); if (fields != null) { @@ -303,12 +321,11 @@ public class AsyncMongoDbClient extends DB { fieldsDoc.add(field, 1); } - fb.setReturnFields(fieldsDoc); + fb.projection(fieldsDoc); } result.ensureCapacity(recordcount); - final MongoIterator<Document> cursor = collection.find(fb - .build()); + final MongoIterator<Document> cursor = collection.find(fb.build()); while (cursor.hasNext()) { // toMap() returns a Map but result.add() expects a // Map<String,String>. Hence, the suppress warnings. @@ -347,7 +364,7 @@ public class AsyncMongoDbClient extends DB { final HashMap<String, ByteIterator> values) { try { final MongoCollection collection = db.getCollection(table); - final Document q = BuilderFactory.start().add("_id", key).build(); + final DocumentBuilder q = BuilderFactory.start().add("_id", key); final DocumentBuilder u = BuilderFactory.start(); final DocumentBuilder fieldsToSet = u.push("$set"); for (final Map.Entry<String, ByteIterator> entry : values @@ -375,10 +392,73 @@ public class AsyncMongoDbClient extends DB { protected final void fillMap(final HashMap<String, ByteIterator> result, final Document queryResult) { for (final Element be : queryResult) { - if (be instanceof BinaryElement) { - result.put(be.getName(), new ByteArrayByteIterator( - ((BinaryElement) be).getValue())); + if (be.getType() == ElementType.BINARY) { + result.put(be.getName(), new BinaryByteArrayIterator( + (BinaryElement) be)); } } } + + /** + * BinaryByteArrayIterator provides an adapter from a {@link BinaryElement} + * to a {@link ByteIterator}. + * + * @copyright 2013, Allanbank Consulting, Inc., All Rights Reserved + */ + private final static class BinaryByteArrayIterator extends ByteIterator { + + /** The binary data. */ + private final BinaryElement binaryElement; + + /** The current offset into the binary element. */ + private int offset; + + /** + * Creates a new BinaryByteArrayIterator. + * + * @param element + * The {@link BinaryElement} to iterate over. + */ + public BinaryByteArrayIterator(BinaryElement element) { + this.binaryElement = element; + this.offset = 0; + } + + /** + * {@inheritDoc} + * <p> + * Overridden to return true if there is more data in the + * {@link BinaryElement}. + * </p> + */ + @Override + public boolean hasNext() { + return (offset < binaryElement.length()); + } + + /** + * {@inheritDoc} + * <p> + * Overridden to return the myNext value and advance the iterator. + * </p> + */ + @Override + public byte nextByte() { + byte value = binaryElement.get(offset); + offset += 1; + + return value; + } + + /** + * {@inheritDoc} + * <p> + * Overridden to return the number of bytes remaining in the iterator. + * </p> + */ + @Override + public long bytesLeft() { + return Math.max(0, binaryElement.length() - offset); + } + } } diff --git a/mongodb/src/main/java/com/yahoo/ycsb/db/MongoDbClient.java b/mongodb/src/main/java/com/yahoo/ycsb/db/MongoDbClient.java index ed8b30b9ada31e182d4b3719e88a8cc9a2a1b3a5..25581e01e436d57a0e55cda587a696fa01f32124 100644 --- a/mongodb/src/main/java/com/yahoo/ycsb/db/MongoDbClient.java +++ b/mongodb/src/main/java/com/yahoo/ycsb/db/MongoDbClient.java @@ -9,14 +9,29 @@ package com.yahoo.ycsb.db; -import com.mongodb.*; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Properties; +import java.util.Random; +import java.util.Set; +import java.util.Vector; +import java.util.concurrent.atomic.AtomicInteger; + +import com.mongodb.BasicDBObject; +import com.mongodb.DBAddress; +import com.mongodb.DBCollection; +import com.mongodb.DBCursor; +import com.mongodb.DBObject; +import com.mongodb.MongoClient; +import com.mongodb.MongoClientOptions; +import com.mongodb.WriteConcern; +import com.mongodb.WriteResult; import com.yahoo.ycsb.ByteArrayByteIterator; import com.yahoo.ycsb.ByteIterator; import com.yahoo.ycsb.DB; import com.yahoo.ycsb.DBException; -import java.util.*; -import java.util.concurrent.atomic.AtomicInteger; /** * MongoDB client for YCSB framework. @@ -34,7 +49,7 @@ public class MongoDbClient extends DB { protected static final Integer INCLUDE = Integer.valueOf(1); /** A singleton Mongo instance. */ - private static Mongo mongo; + private static MongoClient mongo; /** The default write concern for the test. */ private static WriteConcern writeConcern; @@ -42,12 +57,15 @@ public class MongoDbClient extends DB { /** The database to access. */ private static String database; - /** Count the number of times initialized to teardown on the last {@link #cleanup()}. */ + /** + * Count the number of times initialized to teardown on the last + * {@link #cleanup()}. + */ private static final AtomicInteger initCount = new AtomicInteger(0); /** - * Initialize any state for this DB. - * Called once per DB instance; there is one DB instance per client thread. + * Initialize any state for this DB. Called once per DB instance; there is + * one DB instance per client thread. */ @Override public void init() throws DBException { @@ -75,6 +93,10 @@ public class MongoDbClient extends DB { "safe").toLowerCase(); final String maxConnections = props.getProperty( "mongodb.maxconnections", "10"); + final String threadsAllowedToBlockForConnectionMultiplier = props + .getProperty( + "mongodb.threadsAllowedToBlockForConnectionMultiplier", + "5"); if ("none".equals(writeConcernType)) { writeConcern = WriteConcern.NONE; @@ -111,9 +133,13 @@ public class MongoDbClient extends DB { // need to append db to url. url += "/" + database; System.out.println("new database url = " + url); - MongoOptions options = new MongoOptions(); - options.connectionsPerHost = Integer.parseInt(maxConnections); - mongo = new Mongo(new DBAddress(url), options); + MongoClientOptions options = MongoClientOptions + .builder() + .connectionsPerHost(Integer.parseInt(maxConnections)) + .threadsAllowedToBlockForConnectionMultiplier( + Integer.parseInt(threadsAllowedToBlockForConnectionMultiplier)) + .build(); + mongo = new MongoClient(new DBAddress(url), options); System.out.println("mongo connection created with " + url); } @@ -128,8 +154,8 @@ public class MongoDbClient extends DB { } /** - * Cleanup any state for this DB. - * Called once per DB instance; there is one DB instance per client thread. + * Cleanup any state for this DB. Called once per DB instance; there is one + * DB instance per client thread. */ @Override public void cleanup() throws DBException { @@ -148,10 +174,13 @@ public class MongoDbClient extends DB { /** * Delete a record from the database. - * - * @param table The name of the table - * @param key The record key of the record to delete. - * @return Zero on success, a non-zero error code on error. See this class's description for a discussion of error codes. + * + * @param table + * The name of the table + * @param key + * The record key of the record to delete. + * @return Zero on success, a non-zero error code on error. See this class's + * description for a discussion of error codes. */ @Override public int delete(String table, String key) { @@ -176,13 +205,18 @@ public class MongoDbClient extends DB { } /** - * Insert a record in the database. Any field/value pairs in the specified values HashMap will be written into the record with the specified - * record key. - * - * @param table The name of the table - * @param key The record key of the record to insert. - * @param values A HashMap of field/value pairs to insert in the record - * @return Zero on success, a non-zero error code on error. See this class's description for a discussion of error codes. + * Insert a record in the database. Any field/value pairs in the specified + * values HashMap will be written into the record with the specified record + * key. + * + * @param table + * The name of the table + * @param key + * The record key of the record to insert. + * @param values + * A HashMap of field/value pairs to insert in the record + * @return Zero on success, a non-zero error code on error. See this class's + * description for a discussion of error codes. */ @Override public int insert(String table, String key, @@ -195,8 +229,8 @@ public class MongoDbClient extends DB { DBCollection collection = db.getCollection(table); DBObject r = new BasicDBObject().append("_id", key); - for (String k : values.keySet()) { - r.put(k, values.get(k).toArray()); + for (Map.Entry<String, ByteIterator> entry : values.entrySet()) { + r.put(entry.getKey(), entry.getValue().toArray()); } WriteResult res = collection.insert(r, writeConcern); return res.getError() == null ? 0 : 1; @@ -213,12 +247,17 @@ public class MongoDbClient extends DB { } /** - * Read a record from the database. Each field/value pair from the result will be stored in a HashMap. - * - * @param table The name of the table - * @param key The record key of the record to read. - * @param fields The list of fields to read, or null for all of them - * @param result A HashMap of field/value pairs for the result + * Read a record from the database. Each field/value pair from the result + * will be stored in a HashMap. + * + * @param table + * The name of the table + * @param key + * The record key of the record to read. + * @param fields + * The list of fields to read, or null for all of them + * @param result + * A HashMap of field/value pairs for the result * @return Zero on success, a non-zero error code on error or "not found". */ @Override @@ -264,13 +303,18 @@ public class MongoDbClient extends DB { } /** - * Update a record in the database. Any field/value pairs in the specified values HashMap will be written into the record with the specified - * record key, overwriting any existing values with the same field name. - * - * @param table The name of the table - * @param key The record key of the record to write. - * @param values A HashMap of field/value pairs to update in the record - * @return Zero on success, a non-zero error code on error. See this class's description for a discussion of error codes. + * Update a record in the database. Any field/value pairs in the specified + * values HashMap will be written into the record with the specified record + * key, overwriting any existing values with the same field name. + * + * @param table + * The name of the table + * @param key + * The record key of the record to write. + * @param values + * A HashMap of field/value pairs to update in the record + * @return Zero on success, a non-zero error code on error. See this class's + * description for a discussion of error codes. */ @Override public int update(String table, String key, @@ -308,14 +352,22 @@ public class MongoDbClient extends DB { } /** - * Perform a range scan for a set of records in the database. Each field/value pair from the result will be stored in a HashMap. - * - * @param table The name of the table - * @param startkey The record key of the first record to read. - * @param recordcount The number of records to read - * @param fields The list of fields to read, or null for all of them - * @param result A Vector of HashMaps, where each HashMap is a set field/value pairs for one record - * @return Zero on success, a non-zero error code on error. See this class's description for a discussion of error codes. + * Perform a range scan for a set of records in the database. Each + * field/value pair from the result will be stored in a HashMap. + * + * @param table + * The name of the table + * @param startkey + * The record key of the first record to read. + * @param recordcount + * The number of records to read + * @param fields + * The list of fields to read, or null for all of them + * @param result + * A Vector of HashMaps, where each HashMap is a set field/value + * pairs for one record + * @return Zero on success, a non-zero error code on error. See this class's + * description for a discussion of error codes. */ @Override public int scan(String table, String startkey, int recordcount, @@ -355,10 +407,12 @@ public class MongoDbClient extends DB { } /** - * TODO - Finish + * Fills the map with the values from the DBObject. * * @param resultMap + * The map to fill/ * @param obj + * The object to copy values from. */ @SuppressWarnings("unchecked") protected void fillMap(HashMap<String, ByteIterator> resultMap, DBObject obj) {