diff --git a/mongodb/src/main/java/com/yahoo/ycsb/db/MongoDbClient.java b/mongodb/src/main/java/com/yahoo/ycsb/db/MongoDbClient.java index 343a2e5e42d7fa0fda9c9c40e5290f84f7775864..2ce2dd0661517e81cceca591a74e4a13f485f9e1 100644 --- a/mongodb/src/main/java/com/yahoo/ycsb/db/MongoDbClient.java +++ b/mongodb/src/main/java/com/yahoo/ycsb/db/MongoDbClient.java @@ -19,6 +19,7 @@ import java.util.Set; import java.util.Vector; import java.util.concurrent.atomic.AtomicInteger; +import com.mongodb.client.model.InsertManyOptions; import org.bson.Document; import org.bson.types.Binary; @@ -53,14 +54,16 @@ import com.yahoo.ycsb.DBException; */ public class MongoDbClient extends DB { - /** Update options to do an upsert. */ - private static final UpdateOptions UPSERT = new UpdateOptions() - .upsert(true); - /** Used to include a field in a response. */ protected static final Integer INCLUDE = Integer.valueOf(1); - /** The database name to access. */ + /** The options to use for inserting many documents */ + protected static final InsertManyOptions INSERT_MANY_OPTIONS = + new InsertManyOptions().ordered(false); + + /** + * The database name to access. + */ private static String databaseName; /** The database name to access. */ @@ -85,7 +88,7 @@ public class MongoDbClient extends DB { private static int batchSize; /** The bulk inserts pending for the thread. */ - private final List<InsertOneModel<Document>> bulkInserts = new ArrayList<InsertOneModel<Document>>(); + private final List<Document> bulkInserts = new ArrayList<Document>(); /** * Cleanup any state for this DB. Called once per DB instance; there is one @@ -234,59 +237,22 @@ public class MongoDbClient extends DB { try { MongoCollection<Document> collection = database .getCollection(table); - Document criteria = new Document("_id", key); Document toInsert = new Document("_id", key); for (Map.Entry<String, ByteIterator> entry : values.entrySet()) { toInsert.put(entry.getKey(), entry.getValue().toArray()); } - // Do a single upsert. - if (batchSize <= 1) { - UpdateResult result = collection.withWriteConcern(writeConcern) - .replaceOne(criteria, toInsert, UPSERT); - if (!result.wasAcknowledged() - || result.getMatchedCount() > 0 - || (result.isModifiedCountAvailable() && (result - .getModifiedCount() > 0)) - || result.getUpsertedId() != null) { - return 0; - } - - System.err.println("Nothing inserted for key " + key); - return 1; - } - - // Use a bulk insert. - try { - bulkInserts.add(new InsertOneModel<Document>(toInsert)); - if (bulkInserts.size() < batchSize) { - return 0; - } - - BulkWriteResult result = collection.withWriteConcern( - writeConcern).bulkWrite(bulkInserts, - new BulkWriteOptions().ordered(false)); - if (!result.wasAcknowledged() - || result.getInsertedCount() == bulkInserts.size()) { - bulkInserts.clear(); - return 0; - } - - System.err - .println("Number of inserted documents doesn't match the number sent, " - + result.getInsertedCount() - + " inserted, sent " + bulkInserts.size()); + bulkInserts.add(toInsert); + if (bulkInserts.size() == batchSize) { + collection.withWriteConcern(writeConcern) + .insertMany(bulkInserts, INSERT_MANY_OPTIONS); bulkInserts.clear(); - return 1; - } - catch (Exception e) { - System.err.println("Exception while trying bulk insert with " - + bulkInserts.size()); - e.printStackTrace(); - return 1; } + return 0; } catch (Exception e) { + System.err.println("Exception while trying bulk insert with " + + bulkInserts.size()); e.printStackTrace(); return 1; } @@ -314,22 +280,22 @@ public class MongoDbClient extends DB { MongoCollection<Document> collection = database .getCollection(table); Document query = new Document("_id", key); - Document fieldsToReturn = new Document(); + + FindIterable<Document> findIterable = collection + .withReadPreference(readPreference) + .find(query); Document queryResult = null; if (fields != null) { - Iterator<String> iter = fields.iterator(); - while (iter.hasNext()) { - fieldsToReturn.put(iter.next(), INCLUDE); + Document projection = new Document(); + for (String field : fields) { + projection.put(field, INCLUDE); } - queryResult = collection.withReadPreference(readPreference) - .find(query).projection(fieldsToReturn).first(); - } - else { - queryResult = collection.withReadPreference(readPreference) - .find(query).first(); + findIterable.projection(projection); } + queryResult = findIterable.first(); + if (queryResult != null) { fillMap(result, queryResult); } @@ -362,8 +328,7 @@ public class MongoDbClient extends DB { @Override public int scan(String table, String startkey, int recordcount, Set<String> fields, Vector<HashMap<String, ByteIterator>> result) { - FindIterable<Document> cursor = null; - MongoCursor<Document> iter = null; + MongoCursor<Document> cursor = null; try { MongoCollection<Document> collection = database .getCollection(table); @@ -380,18 +345,15 @@ public class MongoDbClient extends DB { } cursor = collection.withReadPreference(readPreference).find(query) - .projection(projection).sort(sort).limit(recordcount); - - // Do the query. - iter = cursor.iterator(); - if (!iter.hasNext()) { + .projection(projection).sort(sort).limit(recordcount).iterator(); + if (!cursor.hasNext()) { System.err.println("Nothing found in scan for key " + startkey); return 1; } - while (iter.hasNext()) { + while (cursor.hasNext()) { HashMap<String, ByteIterator> resultMap = new HashMap<String, ByteIterator>(); - Document obj = iter.next(); + Document obj = cursor.next(); fillMap(resultMap, obj); result.add(resultMap); @@ -404,8 +366,8 @@ public class MongoDbClient extends DB { return 1; } finally { - if (iter != null) { - iter.close(); + if (cursor != null) { + cursor.close(); } } }