diff --git a/mongodb/src/main/java/com/yahoo/ycsb/db/MongoDbClient.java b/mongodb/src/main/java/com/yahoo/ycsb/db/MongoDbClient.java index 343a2e5e42d7fa0fda9c9c40e5290f84f7775864..644c9fcb5e41a53a22e12ee74344fbae2503b9f6 100644 --- a/mongodb/src/main/java/com/yahoo/ycsb/db/MongoDbClient.java +++ b/mongodb/src/main/java/com/yahoo/ycsb/db/MongoDbClient.java @@ -11,7 +11,6 @@ package com.yahoo.ycsb.db; import java.util.ArrayList; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; @@ -19,6 +18,8 @@ import java.util.Set; import java.util.Vector; import java.util.concurrent.atomic.AtomicInteger; +import com.mongodb.client.model.Filters; +import com.mongodb.client.model.InsertManyOptions; import org.bson.Document; import org.bson.types.Binary; @@ -26,13 +27,10 @@ import com.mongodb.MongoClient; import com.mongodb.MongoClientURI; import com.mongodb.ReadPreference; import com.mongodb.WriteConcern; -import com.mongodb.bulk.BulkWriteResult; import com.mongodb.client.FindIterable; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCursor; import com.mongodb.client.MongoDatabase; -import com.mongodb.client.model.BulkWriteOptions; -import com.mongodb.client.model.InsertOneModel; import com.mongodb.client.model.UpdateOptions; import com.mongodb.client.result.DeleteResult; import com.mongodb.client.result.UpdateResult; @@ -53,14 +51,20 @@ import com.yahoo.ycsb.DBException; */ public class MongoDbClient extends DB { - /** Update options to do an upsert. */ - private static final UpdateOptions UPSERT = new UpdateOptions() - .upsert(true); - /** Used to include a field in a response. */ - protected static final Integer INCLUDE = Integer.valueOf(1); + private static final Integer INCLUDE = Integer.valueOf(1); - /** The database name to access. */ + /** The options to use for inserting many documents */ + private static final InsertManyOptions INSERT_UNORDERED = + new InsertManyOptions().ordered(false); + + /** The options to use for inserting a single document */ + private static final UpdateOptions UPDATE_WITH_UPSERT = + new UpdateOptions().upsert(true); + + /** + * The database name to access. + */ private static String databaseName; /** The database name to access. */ @@ -85,7 +89,7 @@ public class MongoDbClient extends DB { private static int batchSize; /** The bulk inserts pending for the thread. */ - private final List<InsertOneModel<Document>> bulkInserts = new ArrayList<InsertOneModel<Document>>(); + private final List<Document> bulkInserts = new ArrayList<Document>(); /** * Cleanup any state for this DB. Called once per DB instance; there is one @@ -199,7 +203,9 @@ public class MongoDbClient extends DB { writeConcern = uri.getOptions().getWriteConcern(); mongoClient = new MongoClient(uri); - database = mongoClient.getDatabase(databaseName); + database = mongoClient.getDatabase(databaseName) + .withReadPreference(readPreference) + .withWriteConcern(writeConcern); System.out.println("mongo client connection created with " + url); @@ -234,59 +240,31 @@ public class MongoDbClient extends DB { try { MongoCollection<Document> collection = database .getCollection(table); - Document criteria = new Document("_id", key); Document toInsert = new Document("_id", key); for (Map.Entry<String, ByteIterator> entry : values.entrySet()) { toInsert.put(entry.getKey(), entry.getValue().toArray()); } - // Do a single upsert. - if (batchSize <= 1) { - UpdateResult result = collection.withWriteConcern(writeConcern) - .replaceOne(criteria, toInsert, UPSERT); - if (!result.wasAcknowledged() - || result.getMatchedCount() > 0 - || (result.isModifiedCountAvailable() && (result - .getModifiedCount() > 0)) - || result.getUpsertedId() != null) { - return 0; - } - - System.err.println("Nothing inserted for key " + key); - return 1; + if (batchSize == 1) { + // this is effectively an insert, but using an upsert instead due + // to current inability of the framework to clean up after itself + // between test runs. + collection.replaceOne(new Document("_id", toInsert.get("_id")), + toInsert, + UPDATE_WITH_UPSERT); } - - // Use a bulk insert. - try { - bulkInserts.add(new InsertOneModel<Document>(toInsert)); - if (bulkInserts.size() < batchSize) { - return 0; - } - - BulkWriteResult result = collection.withWriteConcern( - writeConcern).bulkWrite(bulkInserts, - new BulkWriteOptions().ordered(false)); - if (!result.wasAcknowledged() - || result.getInsertedCount() == bulkInserts.size()) { + else { + bulkInserts.add(toInsert); + if (bulkInserts.size() == batchSize) { + collection.insertMany(bulkInserts, INSERT_UNORDERED); bulkInserts.clear(); - return 0; } - - System.err - .println("Number of inserted documents doesn't match the number sent, " - + result.getInsertedCount() - + " inserted, sent " + bulkInserts.size()); - bulkInserts.clear(); - return 1; - } - catch (Exception e) { - System.err.println("Exception while trying bulk insert with " - + bulkInserts.size()); - e.printStackTrace(); - return 1; } + return 0; } catch (Exception e) { + System.err.println("Exception while trying bulk insert with " + + bulkInserts.size()); e.printStackTrace(); return 1; } @@ -314,22 +292,19 @@ public class MongoDbClient extends DB { MongoCollection<Document> collection = database .getCollection(table); Document query = new Document("_id", key); - Document fieldsToReturn = new Document(); - Document queryResult = null; + FindIterable<Document> findIterable = collection.find(query); + if (fields != null) { - Iterator<String> iter = fields.iterator(); - while (iter.hasNext()) { - fieldsToReturn.put(iter.next(), INCLUDE); + Document projection = new Document(); + for (String field : fields) { + projection.put(field, INCLUDE); } - queryResult = collection.withReadPreference(readPreference) - .find(query).projection(fieldsToReturn).first(); - } - else { - queryResult = collection.withReadPreference(readPreference) - .find(query).first(); + findIterable.projection(projection); } + Document queryResult = findIterable.first(); + if (queryResult != null) { fillMap(result, queryResult); } @@ -362,8 +337,7 @@ public class MongoDbClient extends DB { @Override public int scan(String table, String startkey, int recordcount, Set<String> fields, Vector<HashMap<String, ByteIterator>> result) { - FindIterable<Document> cursor = null; - MongoCursor<Document> iter = null; + MongoCursor<Document> cursor = null; try { MongoCollection<Document> collection = database .getCollection(table); @@ -371,27 +345,32 @@ public class MongoDbClient extends DB { Document scanRange = new Document("$gte", startkey); Document query = new Document("_id", scanRange); Document sort = new Document("_id", INCLUDE); - Document projection = null; + + FindIterable<Document> findIterable = collection.find(query) + .sort(sort) + .limit(recordcount); + if (fields != null) { - projection = new Document(); + Document projection = new Document(); for (String fieldName : fields) { projection.put(fieldName, INCLUDE); } + findIterable.projection(projection); } - cursor = collection.withReadPreference(readPreference).find(query) - .projection(projection).sort(sort).limit(recordcount); + cursor = findIterable.iterator(); - // Do the query. - iter = cursor.iterator(); - if (!iter.hasNext()) { + if (!cursor.hasNext()) { System.err.println("Nothing found in scan for key " + startkey); return 1; } - while (iter.hasNext()) { + + result.ensureCapacity(recordcount); + + while (cursor.hasNext()) { HashMap<String, ByteIterator> resultMap = new HashMap<String, ByteIterator>(); - Document obj = iter.next(); + Document obj = cursor.next(); fillMap(resultMap, obj); result.add(resultMap); @@ -404,8 +383,8 @@ public class MongoDbClient extends DB { return 1; } finally { - if (iter != null) { - iter.close(); + if (cursor != null) { + cursor.close(); } } } @@ -438,8 +417,7 @@ public class MongoDbClient extends DB { } Document update = new Document("$set", fieldsToSet); - UpdateResult result = collection.withWriteConcern(writeConcern) - .updateOne(query, update); + UpdateResult result = collection.updateOne(query, update); if (result.wasAcknowledged() && result.getMatchedCount() == 0) { System.err.println("Nothing updated for key " + key); return 1; diff --git a/mongodb/src/main/java/com/yahoo/ycsb/db/OptionsSupport.java b/mongodb/src/main/java/com/yahoo/ycsb/db/OptionsSupport.java index b8aa6f4d5ddd37d93c326c94a96664045faf1b54..4b1429609512627ae9b465ebeafcc41a40b87f3f 100644 --- a/mongodb/src/main/java/com/yahoo/ycsb/db/OptionsSupport.java +++ b/mongodb/src/main/java/com/yahoo/ycsb/db/OptionsSupport.java @@ -30,7 +30,7 @@ import java.util.Properties; public final class OptionsSupport { /** Value for an unavailable property. */ - protected static final String UNAVAILABLE = "n/a"; + private static final String UNAVAILABLE = "n/a"; /** * Updates the URL with the appropriate attributes if legacy properties are @@ -76,7 +76,8 @@ public final class OptionsSupport { result = addUrlOption(result, "w", "1"); } else if ("journaled".equals(writeConcernType)) { - result = addUrlOption(result, "journal", "true"); + result = addUrlOption(result, "journal", "true"); // this is the documented option name + result = addUrlOption(result, "j", "true"); // but keep this until MongoDB Java driver supports "journal" option } else if ("replica_acknowledged".equals(writeConcernType)) { result = addUrlOption(result, "w", "2"); diff --git a/mongodb/src/test/java/com/yahoo/ycsb/db/OptionsSupportTest.java b/mongodb/src/test/java/com/yahoo/ycsb/db/OptionsSupportTest.java index 20829d55571b3f7f0db7a16b56acf674be6b8791..a1b9de650f9922e33c7a3b36bbf453bfaab03ee1 100644 --- a/mongodb/src/test/java/com/yahoo/ycsb/db/OptionsSupportTest.java +++ b/mongodb/src/test/java/com/yahoo/ycsb/db/OptionsSupportTest.java @@ -110,7 +110,7 @@ public class OptionsSupportTest { assertThat( updateUrl("mongodb://locahost:27017/?foo=bar", props("mongodb.writeConcern", "journaled")), - is("mongodb://locahost:27017/?foo=bar&journal=true")); + is("mongodb://locahost:27017/?foo=bar&journal=true&j=true")); assertThat( updateUrl("mongodb://locahost:27017/?foo=bar", props("mongodb.writeConcern", "replica_acknowledged")),