From 6970ee1152c088852701f7b96223c935203b3730 Mon Sep 17 00:00:00 2001
From: "Robert J. Moore" <Robert.J.Moore@allanbank.com>
Date: Thu, 21 May 2015 23:44:13 -0400
Subject: [PATCH] Add majority write concern and deprecated most of the
 options. Migrated to the 3.0 driver from MongoDB Inc. and updated to the new
 Document and fluent APIs.

---
 mongodb/README.md                             |  37 ++-
 mongodb/pom.xml                               |   9 +-
 .../com/yahoo/ycsb/db/AsyncMongoDbClient.java | 178 ++++++++++-----
 .../java/com/yahoo/ycsb/db/MongoDbClient.java | 210 ++++++++++++------
 .../com/yahoo/ycsb/db/OptionsSupport.java     |  11 +-
 .../com/yahoo/ycsb/db/OptionsSupportTest.java |   4 +
 6 files changed, 324 insertions(+), 125 deletions(-)

diff --git a/mongodb/README.md b/mongodb/README.md
index cefec635..f6dd8f4c 100644
--- a/mongodb/README.md
+++ b/mongodb/README.md
@@ -77,14 +77,47 @@ See the next section for the list of configuration parameters for MongoDB.
 
 ## MongoDB Configuration Parameters
 
-- `mongodb.url` default: `mongodb://localhost:27017/ycsb?w=1`
+- `mongodb.url`
   - This should be a MongoDB URI or connection string. 
     - See http://docs.mongodb.org/manual/reference/connection-string/ for the standard options.
     - For the complete set of options for the asynchronous driver see: 
       - http://www.allanbank.com/mongodb-async-driver/apidocs/index.html?com/allanbank/mongodb/MongoDbUri.html
     - For the complete set of options for the synchronous driver see:
       - http://api.mongodb.org/java/current/index.html?com/mongodb/MongoClientURI.html
-
+  - Default value is `mongodb://localhost:27017/ycsb?w=1`
+
+- `mongodb.batchsize`
+  - Useful for the insert workload as it will submit the inserts in batches inproving throughput.
+  - Default value is `1`.
+
+- `mongodb.writeConcern`
+  - **Deprecated** - Use the `w` and `journal` options on the MongoDB URI provided by the `mongodb.uri`.
+  - Allowed values are :
+    - `errors_ignored`
+    - `unacknowledged`
+    - `acknowledged`
+    - `journaled`
+    - `replica_acknowledged`
+    - `majority`
+  - Default value is `acknowledged`.
+ 
+- `mongodb.readPreference`
+  - **Deprecated** - Use the `readPreference` options on the MongoDB URI provided by the `mongodb.uri`.
+  - Allowed values are :
+    - `primary`
+    - `primary_preferred`
+    - `secondary`
+    - `secondary_preferred`
+    - `nearest`
+  - Default value is `primary`.
+ 
+- `mongodb.maxconnections`
+  - **Deprecated** - Use the `maxPoolSize` options on the MongoDB URI provided by the `mongodb.uri`.
+  - Default value is `100`.
+
+- `mongodb.threadsAllowedToBlockForConnectionMultiplier`
+  - **Deprecated** - Use the `waitQueueMultiple` options on the MongoDB URI provided by the `mongodb.uri`.
+  - Default value is `5`.
 
 For example:
 
diff --git a/mongodb/pom.xml b/mongodb/pom.xml
index f9bb2201..148a4c74 100644
--- a/mongodb/pom.xml
+++ b/mongodb/pom.xml
@@ -9,7 +9,7 @@
 	</parent>
 
 	<artifactId>mongodb-binding</artifactId>
-	<name>Mongo DB Binding</name>
+	<name>MongoDB Binding</name>
 	<packaging>jar</packaging>
 
 	<dependencies>
@@ -33,6 +33,13 @@
 			<artifactId>logback-classic</artifactId>
 			<version>1.1.2</version>
 		</dependency>
+
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<version>4.12</version>
+			<scope>test</scope>
+		</dependency>
 	</dependencies>
 
 	<build>
diff --git a/mongodb/src/main/java/com/yahoo/ycsb/db/AsyncMongoDbClient.java b/mongodb/src/main/java/com/yahoo/ycsb/db/AsyncMongoDbClient.java
index f99f4f77..c57ff518 100644
--- a/mongodb/src/main/java/com/yahoo/ycsb/db/AsyncMongoDbClient.java
+++ b/mongodb/src/main/java/com/yahoo/ycsb/db/AsyncMongoDbClient.java
@@ -42,7 +42,10 @@ import com.allanbank.mongodb.bson.ElementType;
 import com.allanbank.mongodb.bson.builder.BuilderFactory;
 import com.allanbank.mongodb.bson.builder.DocumentBuilder;
 import com.allanbank.mongodb.bson.element.BinaryElement;
+import com.allanbank.mongodb.builder.BatchedWrite;
+import com.allanbank.mongodb.builder.BatchedWriteMode;
 import com.allanbank.mongodb.builder.Find;
+import com.allanbank.mongodb.builder.Sort;
 import com.yahoo.ycsb.ByteIterator;
 import com.yahoo.ycsb.DB;
 import com.yahoo.ycsb.DBException;
@@ -59,8 +62,11 @@ import com.yahoo.ycsb.DBException;
  */
 public class AsyncMongoDbClient extends DB {
 
+    /** Used to include a field in a response. */
+    protected static final int INCLUDE = 1;
+
     /** The database to use. */
-    private static String database;
+    private static String databaseName;
 
     /** Thread local document builder. */
     private static final ThreadLocal<DocumentBuilder> DOCUMENT_BUILDER = new ThreadLocal<DocumentBuilder>() {
@@ -74,7 +80,7 @@ public class AsyncMongoDbClient extends DB {
     private static final AtomicInteger initCount = new AtomicInteger(0);
 
     /** The connection to MongoDB. */
-    private static MongoClient mongo;
+    private static MongoClient mongoClient;
 
     /** The write concern for the requests. */
     private static Durability writeConcern;
@@ -83,7 +89,17 @@ public class AsyncMongoDbClient extends DB {
     private static ReadPreference readPreference;
 
     /** The database to MongoDB. */
-    private MongoDatabase db;
+    private MongoDatabase database;
+
+    /** The batch size to use for inserts. */
+    private static int batchSize;
+
+    /** The bulk inserts pending for the thread. */
+    private final BatchedWrite.Builder batchedWrite = BatchedWrite.builder()
+            .mode(BatchedWriteMode.REORDERED);
+
+    /** The number of writes in the batchedWrite. */
+    private int batchedWriteCount = 0;
 
     /**
      * Cleanup any state for this DB. Called once per DB instance; there is one
@@ -91,9 +107,9 @@ public class AsyncMongoDbClient extends DB {
      */
     @Override
     public final void cleanup() throws DBException {
-        if (initCount.decrementAndGet() <= 0) {
+        if (initCount.decrementAndGet() == 0) {
             try {
-                mongo.close();
+                mongoClient.close();
             }
             catch (final Exception e1) {
                 System.err.println("Could not close MongoDB connection pool: "
@@ -101,6 +117,10 @@ public class AsyncMongoDbClient extends DB {
                 e1.printStackTrace();
                 return;
             }
+            finally {
+                mongoClient = null;
+                database = null;
+            }
         }
     }
 
@@ -117,10 +137,14 @@ public class AsyncMongoDbClient extends DB {
     @Override
     public final int delete(final String table, final String key) {
         try {
-            final MongoCollection collection = db.getCollection(table);
+            final MongoCollection collection = database.getCollection(table);
             final Document q = BuilderFactory.start().add("_id", key).build();
             final long res = collection.delete(q, writeConcern);
-            return res == 1 ? 0 : 1;
+            if (res == 0) {
+                System.err.println("Nothing deleted for key " + key);
+                return 1;
+            }
+            return 0;
         }
         catch (final Exception e) {
             System.err.println(e.toString());
@@ -137,22 +161,27 @@ public class AsyncMongoDbClient extends DB {
         final int count = initCount.incrementAndGet();
 
         synchronized (AsyncMongoDbClient.class) {
-            if (mongo != null) {
-                db = mongo.getDatabase(database);
+            final Properties props = getProperties();
+
+            if (mongoClient != null) {
+                database = mongoClient.getDatabase(databaseName);
 
                 // If there are more threads (count) than connections then the
                 // Low latency spin lock is not really needed as we will keep
                 // the connections occupied.
-                if (count > mongo.getConfig().getMaxConnectionCount()) {
-                    mongo.getConfig().setLockType(LockType.MUTEX);
+                if (count > mongoClient.getConfig().getMaxConnectionCount()) {
+                    mongoClient.getConfig().setLockType(LockType.MUTEX);
                 }
 
                 return;
             }
 
+            // Set insert batchsize, default 1 - to be YCSB-original equivalent
+            batchSize = Integer.parseInt(props.getProperty("mongodb.batchsize", "1"));
+
             // Just use the standard connection format URL
-            // http://docs.mongodb.org/manual/reference/connection-string/
-            final Properties props = getProperties();
+            // http://docs.mongodatabase.org/manual/reference/connection-string/
+            // to configure the client.
             String url = props.getProperty("mongodb.url",
                     "mongodb://localhost:27017/ycsb?w=1");
             if (!url.startsWith("mongodb://")) {
@@ -168,8 +197,8 @@ public class AsyncMongoDbClient extends DB {
             MongoDbUri uri = new MongoDbUri(url);
 
             try {
-                database = uri.getDatabase();
-                if ((database == null) || database.isEmpty()) {
+                databaseName = uri.getDatabase();
+                if ((databaseName == null) || databaseName.isEmpty()) {
                     System.err
                             .println("ERROR: Invalid URL: '"
                                     + url
@@ -178,9 +207,9 @@ public class AsyncMongoDbClient extends DB {
                     System.exit(1);
                 }
 
-                mongo = MongoFactory.createClient(uri);
+                mongoClient = MongoFactory.createClient(uri);
 
-                MongoClientConfiguration config = mongo.getConfig();
+                MongoClientConfiguration config = mongoClient.getConfig();
                 if (!url.toLowerCase().contains("locktype=")) {
                     config.setLockType(LockType.LOW_LATENCY_SPIN); // assumed...
                 }
@@ -188,7 +217,7 @@ public class AsyncMongoDbClient extends DB {
                 readPreference = config.getDefaultReadPreference();
                 writeConcern = config.getDefaultDurability();
 
-                db = mongo.getDatabase(database);
+                database = mongoClient.getDatabase(databaseName);
 
                 System.out.println("mongo connection created with " + url);
             }
@@ -213,27 +242,61 @@ public class AsyncMongoDbClient extends DB {
      *            The record key of the record to insert.
      * @param values
      *            A HashMap of field/value pairs to insert in the record
-     * @return Zero on success, a non-zero error code on error. See this class's
-     *         description for a discussion of error codes.
+     * @return Zero on success, a non-zero error code on error. See the
+     *         {@link DB} class's description for a discussion of error codes.
      */
     @Override
     public final int insert(final String table, final String key,
             final HashMap<String, ByteIterator> values) {
         try {
-            final MongoCollection collection = db.getCollection(table);
-            final DocumentBuilder r = DOCUMENT_BUILDER.get().reset()
+            final MongoCollection collection = database.getCollection(table);
+            final DocumentBuilder toInsert = DOCUMENT_BUILDER.get().reset()
                     .add("_id", key);
-            final Document q = r.build();
+            final Document query = toInsert.build();
             for (final Map.Entry<String, ByteIterator> entry : values
                     .entrySet()) {
-                r.add(entry.getKey(), entry.getValue().toArray());
+                toInsert.add(entry.getKey(), entry.getValue().toArray());
             }
-            collection.insert(writeConcern, r);
 
-            collection.update(q, r, /* multi= */false, /* upsert= */true,
-                    writeConcern);
+            // Do an upsert.
+            if (batchSize <= 1) {
+                long result = collection.update(query, toInsert,
+                /* multi= */false, /* upsert= */true, writeConcern);
 
-            return 0;
+                return result == 1 ? 0 : 1;
+            }
+
+            // Use a bulk insert.
+            try {
+                batchedWrite.insert(toInsert);
+                batchedWriteCount += 1;
+
+                if (batchedWriteCount < batchSize) {
+                    return 0;
+                }
+
+                long count = collection.write(batchedWrite);
+                if (count == batchedWriteCount) {
+                    batchedWrite.reset().mode(BatchedWriteMode.REORDERED);
+                    batchedWriteCount = 0;
+                    return 0;
+                }
+
+                System.err
+                        .println("Number of inserted documents doesn't match the number sent, "
+                                + count
+                                + " inserted, sent "
+                                + batchedWriteCount);
+                batchedWrite.reset().mode(BatchedWriteMode.REORDERED);
+                batchedWriteCount = 0;
+                return 1;
+            }
+            catch (Exception e) {
+                System.err.println("Exception while trying bulk insert with "
+                        + batchedWriteCount);
+                e.printStackTrace();
+                return 1;
+            }
         }
         catch (final Exception e) {
             e.printStackTrace();
@@ -259,18 +322,19 @@ public class AsyncMongoDbClient extends DB {
     public final int read(final String table, final String key,
             final Set<String> fields, final HashMap<String, ByteIterator> result) {
         try {
-            final MongoCollection collection = db.getCollection(table);
-            final DocumentBuilder q = BuilderFactory.start().add("_id", key);
-            final DocumentBuilder fieldsToReturn = BuilderFactory.start();
+            final MongoCollection collection = database.getCollection(table);
+            final DocumentBuilder query = DOCUMENT_BUILDER.get().reset()
+                    .add("_id", key);
 
             Document queryResult = null;
             if (fields != null) {
+                final DocumentBuilder fieldsToReturn = BuilderFactory.start();
                 final Iterator<String> iter = fields.iterator();
                 while (iter.hasNext()) {
                     fieldsToReturn.add(iter.next(), 1);
                 }
 
-                final Find.Builder fb = new Find.Builder(q);
+                final Find.Builder fb = new Find.Builder(query);
                 fb.projection(fieldsToReturn);
                 fb.setLimit(1);
                 fb.setBatchSize(1);
@@ -283,7 +347,7 @@ public class AsyncMongoDbClient extends DB {
                 }
             }
             else {
-                queryResult = collection.findOne(q);
+                queryResult = collection.findOne(query);
             }
 
             if (queryResult != null) {
@@ -313,33 +377,37 @@ public class AsyncMongoDbClient extends DB {
      * @param result
      *            A Vector of HashMaps, where each HashMap is a set field/value
      *            pairs for one record
-     * @return Zero on success, a non-zero error code on error. See this class's
-     *         description for a discussion of error codes.
+     * @return Zero on success, a non-zero error code on error. See the
+     *         {@link DB} class's description for a discussion of error codes.
      */
     @Override
     public final int scan(final String table, final String startkey,
             final int recordcount, final Set<String> fields,
             final Vector<HashMap<String, ByteIterator>> result) {
         try {
-            final MongoCollection collection = db.getCollection(table);
-
-            // { "_id":{"$gte":startKey}} }
-            final Find.Builder fb = new Find.Builder();
-            fb.setQuery(where("_id").greaterThanOrEqualTo(startkey));
-            fb.setLimit(recordcount);
-            fb.setBatchSize(recordcount);
-            fb.readPreference(readPreference);
+            final MongoCollection collection = database.getCollection(table);
+
+            final Find.Builder find = Find.builder()
+                    .query(where("_id").greaterThanOrEqualTo(startkey))
+                    .limit(recordcount).batchSize(recordcount)
+                    .sort(Sort.asc("_id")).readPreference(readPreference);
+
             if (fields != null) {
                 final DocumentBuilder fieldsDoc = BuilderFactory.start();
                 for (final String field : fields) {
-                    fieldsDoc.add(field, 1);
+                    fieldsDoc.add(field, INCLUDE);
                 }
 
-                fb.projection(fieldsDoc);
+                find.projection(fieldsDoc);
             }
 
             result.ensureCapacity(recordcount);
-            final MongoIterator<Document> cursor = collection.find(fb.build());
+
+            final MongoIterator<Document> cursor = collection.find(find);
+            if (!cursor.hasNext()) {
+                System.err.println("Nothing found in scan for key " + startkey);
+                return 1;
+            }
             while (cursor.hasNext()) {
                 // toMap() returns a Map but result.add() expects a
                 // Map<String,String>. Hence, the suppress warnings.
@@ -370,23 +438,25 @@ public class AsyncMongoDbClient extends DB {
      *            The record key of the record to write.
      * @param values
      *            A HashMap of field/value pairs to update in the record
-     * @return Zero on success, a non-zero error code on error. See this class's
-     *         description for a discussion of error codes.
+     * @return Zero on success, a non-zero error code on error. See the
+     *         {@link DB} class's description for a discussion of error codes.
      */
     @Override
     public final int update(final String table, final String key,
             final HashMap<String, ByteIterator> values) {
         try {
-            final MongoCollection collection = db.getCollection(table);
-            final DocumentBuilder q = BuilderFactory.start().add("_id", key);
-            final DocumentBuilder u = BuilderFactory.start();
-            final DocumentBuilder fieldsToSet = u.push("$set");
+            final MongoCollection collection = database.getCollection(table);
+            final DocumentBuilder query = BuilderFactory.start()
+                    .add("_id", key);
+            final DocumentBuilder update = BuilderFactory.start();
+            final DocumentBuilder fieldsToSet = update.push("$set");
+
             for (final Map.Entry<String, ByteIterator> entry : values
                     .entrySet()) {
                 fieldsToSet.add(entry.getKey(), entry.getValue().toArray());
             }
-            final long res = collection
-                    .update(q, u, false, false, writeConcern);
+            final long res = collection.update(query, update, false, false,
+                    writeConcern);
             return res == 1 ? 0 : 1;
         }
         catch (final Exception e) {
diff --git a/mongodb/src/main/java/com/yahoo/ycsb/db/MongoDbClient.java b/mongodb/src/main/java/com/yahoo/ycsb/db/MongoDbClient.java
index 98f0bd3e..511efda2 100644
--- a/mongodb/src/main/java/com/yahoo/ycsb/db/MongoDbClient.java
+++ b/mongodb/src/main/java/com/yahoo/ycsb/db/MongoDbClient.java
@@ -3,14 +3,16 @@
  *
  * Submitted by Yen Pai on 5/11/2010.
  *
- * https://gist.github.com/000a66b8db2caf42467b#file_mongo_db.java
+ * https://gist.github.com/000a66b8db2caf42467b#file_mongo_database.java
  *
  */
 
 package com.yahoo.ycsb.db;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
@@ -23,11 +25,16 @@ import com.mongodb.MongoClient;
 import com.mongodb.MongoClientURI;
 import com.mongodb.ReadPreference;
 import com.mongodb.WriteConcern;
+import com.mongodb.bulk.BulkWriteResult;
 import com.mongodb.client.FindIterable;
 import com.mongodb.client.MongoCollection;
 import com.mongodb.client.MongoCursor;
 import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.model.BulkWriteOptions;
+import com.mongodb.client.model.InsertOneModel;
 import com.mongodb.client.model.UpdateOptions;
+import com.mongodb.client.result.DeleteResult;
+import com.mongodb.client.result.UpdateResult;
 import com.yahoo.ycsb.ByteArrayByteIterator;
 import com.yahoo.ycsb.ByteIterator;
 import com.yahoo.ycsb.DB;
@@ -38,18 +45,25 @@ import com.yahoo.ycsb.DBException;
  * 
  * Properties to set:
  * 
- * mongodb.url=mongodb://localhost:27017 mongodb.database=ycsb
- * mongodb.writeConcern=acknowledged
+ * mongodatabase.url=mongodb://localhost:27017 mongodatabase.database=ycsb
+ * mongodatabase.writeConcern=acknowledged
  * 
  * @author ypai
  */
 public class MongoDbClient extends DB {
 
+    /** Update options to do an upsert. */
+    private static final UpdateOptions UPSERT = new UpdateOptions()
+            .upsert(true);
+
     /** Used to include a field in a response. */
     protected static final Integer INCLUDE = Integer.valueOf(1);
 
-    /** The database to access. */
-    private static String database;
+    /** The database name to access. */
+    private static String databaseName;
+
+    /** The database name to access. */
+    private static MongoDatabase database;
 
     /**
      * Count the number of times initialized to teardown on the last
@@ -58,7 +72,7 @@ public class MongoDbClient extends DB {
     private static final AtomicInteger initCount = new AtomicInteger(0);
 
     /** A singleton Mongo instance. */
-    private static MongoClient mongo;
+    private static MongoClient mongoClient;
 
     /** The default read preference for the test */
     private static ReadPreference readPreference;
@@ -66,15 +80,21 @@ public class MongoDbClient extends DB {
     /** The default write concern for the test. */
     private static WriteConcern writeConcern;
 
+    /** The batch size to use for inserts. */
+    private static int batchSize;
+
+    /** The bulk inserts pending for the thread. */
+    private final List<InsertOneModel<Document>> bulkInserts = new ArrayList<InsertOneModel<Document>>();
+
     /**
      * Cleanup any state for this DB. Called once per DB instance; there is one
      * DB instance per client thread.
      */
     @Override
     public void cleanup() throws DBException {
-        if (initCount.decrementAndGet() <= 0) {
+        if (initCount.decrementAndGet() == 0) {
             try {
-                mongo.close();
+                mongoClient.close();
             }
             catch (Exception e1) {
                 System.err.println("Could not close MongoDB connection pool: "
@@ -82,6 +102,10 @@ public class MongoDbClient extends DB {
                 e1.printStackTrace();
                 return;
             }
+            finally {
+                database = null;
+                mongoClient = null;
+            }
         }
     }
 
@@ -92,19 +116,22 @@ public class MongoDbClient extends DB {
      *            The name of the table
      * @param key
      *            The record key of the record to delete.
-     * @return Zero on success, a non-zero error code on error. See this class's
-     *         description for a discussion of error codes.
+     * @return Zero on success, a non-zero error code on error. See the
+     *         {@link DB} class's description for a discussion of error codes.
      */
     @Override
     public int delete(String table, String key) {
-        MongoDatabase db = null;
         try {
-            db = mongo.getDatabase(database);
-            MongoCollection<Document> collection = db.getCollection(table);
-
-            Document q = new Document("_id", key);
-            collection.withWriteConcern(writeConcern).deleteOne(q);
-
+            MongoCollection<Document> collection = database
+                    .getCollection(table);
+
+            Document query = new Document("_id", key);
+            DeleteResult result = collection.withWriteConcern(writeConcern)
+                    .deleteOne(query);
+            if (result.getDeletedCount() == 0) {
+                System.err.println("Nothing deleted for key " + key);
+                return 1;
+            }
             return 0;
         }
         catch (Exception e) {
@@ -121,16 +148,21 @@ public class MongoDbClient extends DB {
     public void init() throws DBException {
         initCount.incrementAndGet();
         synchronized (INCLUDE) {
-            if (mongo != null) {
+            if (mongoClient != null) {
                 return;
             }
 
+            Properties props = getProperties();
+
+            // Set insert batchsize, default 1 - to be YCSB-original equivalent
+            batchSize = Integer.parseInt(props.getProperty("batchsize", "1"));
+
             // Just use the standard connection format URL
-            // http://docs.mongodb.org/manual/reference/connection-string/
+            // http://docs.mongodatabase.org/manual/reference/connection-string/
+            // to configure the client.
             //
             // Support legacy options by updating the URL as appropriate.
-            Properties props = getProperties();
-            String url = props.getProperty("mongodb.url", null);
+            String url = props.getProperty("mongodatabase.url", null);
             boolean defaultedUrl = false;
             if (url == null) {
                 defaultedUrl = true;
@@ -145,7 +177,7 @@ public class MongoDbClient extends DB {
                                 + url
                                 + "'. Must be of the form "
                                 + "'mongodb://<host1>:<port1>,<host2>:<port2>/database?options'. "
-                                + "See http://docs.mongodb.org/manual/reference/connection-string/.");
+                                + "See http://docs.mongodatabase.org/manual/reference/connection-string/.");
                 System.exit(1);
             }
 
@@ -155,13 +187,14 @@ public class MongoDbClient extends DB {
                 String uriDb = uri.getDatabase();
                 if (!defaultedUrl && (uriDb != null) && !uriDb.isEmpty()
                         && !"admin".equals(uriDb)) {
-                    database = uriDb;
+                    databaseName = uriDb;
                 }
                 else {
-                    database = props.getProperty("mongodb.database", "ycsb");
+                    databaseName = props.getProperty("mongodatabase.database",
+                            "ycsb");
                 }
 
-                if ((database == null) || database.isEmpty()) {
+                if ((databaseName == null) || databaseName.isEmpty()) {
                     System.err
                             .println("ERROR: Invalid URL: '"
                                     + url
@@ -173,9 +206,11 @@ public class MongoDbClient extends DB {
                 readPreference = uri.getOptions().getReadPreference();
                 writeConcern = uri.getOptions().getWriteConcern();
 
-                mongo = new MongoClient(uri);
+                mongoClient = new MongoClient(uri);
+                database = mongoClient.getDatabase(databaseName);
 
-                System.out.println("mongo connection created with " + url);
+                System.out.println("mongo client connection created with "
+                        + url);
             }
             catch (Exception e1) {
                 System.err
@@ -198,32 +233,67 @@ public class MongoDbClient extends DB {
      *            The record key of the record to insert.
      * @param values
      *            A HashMap of field/value pairs to insert in the record
-     * @return Zero on success, a non-zero error code on error. See this class's
-     *         description for a discussion of error codes.
+     * @return Zero on success, a non-zero error code on error. See the
+     *         {@link DB} class's description for a discussion of error codes.
      */
     @Override
     public int insert(String table, String key,
             HashMap<String, ByteIterator> values) {
-        MongoDatabase db = null;
         try {
-            db = mongo.getDatabase(database);
-
-            MongoCollection<Document> collection = db.getCollection(table);
+            MongoCollection<Document> collection = database
+                    .getCollection(table);
             Document criteria = new Document("_id", key);
             Document toInsert = new Document("_id", key);
             for (Map.Entry<String, ByteIterator> entry : values.entrySet()) {
                 toInsert.put(entry.getKey(), entry.getValue().toArray());
             }
 
-            collection.withWriteConcern(writeConcern).updateOne(criteria,
-                    toInsert, new UpdateOptions().upsert(true));
+            // Do a single upsert.
+            if (batchSize <= 1) {
+                UpdateResult result = collection.withWriteConcern(writeConcern)
+                        .updateOne(criteria, toInsert, UPSERT);
+                if (result.getMatchedCount() > 0
+                        || result.getModifiedCount() > 0) {
+                    return 0;
+                }
+                System.err.println("Nothing inserted for key " + key);
+                return 1;
+            }
 
-            return 0;
+            // Use a bulk insert.
+            try {
+                bulkInserts.add(new InsertOneModel<Document>(toInsert));
+                if (bulkInserts.size() < batchSize) {
+                    return 0;
+                }
+
+                BulkWriteResult result = collection.withWriteConcern(
+                        writeConcern).bulkWrite(bulkInserts,
+                        new BulkWriteOptions().ordered(false));
+                if (result.getInsertedCount() == bulkInserts.size()) {
+                    bulkInserts.clear();
+                    return 0;
+                }
+
+                System.err
+                        .println("Number of inserted documents doesn't match the number sent, "
+                                + result.getInsertedCount()
+                                + " inserted, sent " + bulkInserts.size());
+                bulkInserts.clear();
+                return 1;
+            }
+            catch (Exception e) {
+                System.err.println("Exception while trying bulk insert with "
+                        + bulkInserts.size());
+                e.printStackTrace();
+                return 1;
+            }
         }
         catch (Exception e) {
             e.printStackTrace();
             return 1;
         }
+
     }
 
     /**
@@ -243,12 +313,10 @@ public class MongoDbClient extends DB {
     @Override
     public int read(String table, String key, Set<String> fields,
             HashMap<String, ByteIterator> result) {
-        MongoDatabase db = null;
         try {
-            db = mongo.getDatabase(database);
-
-            MongoCollection<Document> collection = db.getCollection(table);
-            Document q = new Document("_id", key);
+            MongoCollection<Document> collection = database
+                    .getCollection(table);
+            Document query = new Document("_id", key);
             Document fieldsToReturn = new Document();
 
             Document queryResult = null;
@@ -258,11 +326,11 @@ public class MongoDbClient extends DB {
                     fieldsToReturn.put(iter.next(), INCLUDE);
                 }
                 queryResult = collection.withReadPreference(readPreference)
-                        .find(q).projection(fieldsToReturn).first();
+                        .find(query).projection(fieldsToReturn).first();
             }
             else {
                 queryResult = collection.withReadPreference(readPreference)
-                        .find(q).first();
+                        .find(query).first();
             }
 
             if (queryResult != null) {
@@ -291,30 +359,39 @@ public class MongoDbClient extends DB {
      * @param result
      *            A Vector of HashMaps, where each HashMap is a set field/value
      *            pairs for one record
-     * @return Zero on success, a non-zero error code on error. See this class's
-     *         description for a discussion of error codes.
+     * @return Zero on success, a non-zero error code on error. See the
+     *         {@link DB} class's description for a discussion of error codes.
      */
     @Override
     public int scan(String table, String startkey, int recordcount,
             Set<String> fields, Vector<HashMap<String, ByteIterator>> result) {
-        MongoDatabase db = null;
         FindIterable<Document> cursor = null;
         MongoCursor<Document> iter = null;
         try {
-            db = mongo.getDatabase(database);
-
-            MongoCollection<Document> collection = db.getCollection(table);
+            MongoCollection<Document> collection = database
+                    .getCollection(table);
 
-            // { "_id":{"$gte":startKey, "$lte":{"appId":key+"\uFFFF"}} }
             Document scanRange = new Document("$gte", startkey);
-            Document q = new Document("_id", scanRange);
-            cursor = collection.withReadPreference(readPreference).find(q)
-                    .limit(recordcount);
+            Document query = new Document("_id", scanRange);
+            Document sort = new Document("_id", INCLUDE);
+            Document projection = null;
+            if (fields != null) {
+                projection = new Document();
+                for (String fieldName : fields) {
+                    projection.put(fieldName, INCLUDE);
+                }
+            }
 
+            cursor = collection.withReadPreference(readPreference).find(query)
+                    .projection(projection).sort(sort).limit(recordcount);
+
+            // Do the query.
             iter = cursor.iterator();
+            if (!iter.hasNext()) {
+                System.err.println("Nothing found in scan for key " + startkey);
+                return 1;
+            }
             while (iter.hasNext()) {
-                // toMap() returns a Map, but result.add() expects a
-                // Map<String,String>. Hence, the suppress warnings.
                 HashMap<String, ByteIterator> resultMap = new HashMap<String, ByteIterator>();
 
                 Document obj = iter.next();
@@ -353,22 +430,23 @@ public class MongoDbClient extends DB {
     @Override
     public int update(String table, String key,
             HashMap<String, ByteIterator> values) {
-        MongoDatabase db = null;
         try {
-            db = mongo.getDatabase(database);
-
-            MongoCollection<Document> collection = db.getCollection(table);
-            Document q = new Document("_id", key);
+            MongoCollection<Document> collection = database
+                    .getCollection(table);
 
+            Document query = new Document("_id", key);
             Document fieldsToSet = new Document();
-            Iterator<String> keys = values.keySet().iterator();
-            while (keys.hasNext()) {
-                String tmpKey = keys.next();
-                fieldsToSet.put(tmpKey, values.get(tmpKey).toArray());
+            for (Map.Entry<String, ByteIterator> entry : values.entrySet()) {
+                fieldsToSet.put(entry.getKey(), entry.getValue().toArray());
             }
-            Document u = new Document("$set", fieldsToSet);
+            Document update = new Document("$set", fieldsToSet);
 
-            collection.withWriteConcern(writeConcern).updateOne(q, u);
+            UpdateResult result = collection.withWriteConcern(writeConcern)
+                    .updateOne(query, update);
+            if (result.getMatchedCount() == 0) {
+                System.err.println("Nothing updated for key " + key);
+                return 1;
+            }
             return 0;
         }
         catch (Exception e) {
diff --git a/mongodb/src/main/java/com/yahoo/ycsb/db/OptionsSupport.java b/mongodb/src/main/java/com/yahoo/ycsb/db/OptionsSupport.java
index 7f81f695..b8aa6f4d 100644
--- a/mongodb/src/main/java/com/yahoo/ycsb/db/OptionsSupport.java
+++ b/mongodb/src/main/java/com/yahoo/ycsb/db/OptionsSupport.java
@@ -81,9 +81,14 @@ public final class OptionsSupport {
             else if ("replica_acknowledged".equals(writeConcernType)) {
                 result = addUrlOption(result, "w", "2");
             }
+            else if ("majority".equals(writeConcernType)) {
+                result = addUrlOption(result, "w", "majority");
+            }
             else {
                 System.err.println("WARNING: Invalid writeConcern: '"
-                        + writeConcernType + "' will be ignored.");
+                        + writeConcernType + "' will be ignored. "
+                        + "Must be one of [ unacknowledged | acknowledged | "
+                        + "journaled | replica_acknowledged | majority ]");
             }
         }
 
@@ -110,7 +115,9 @@ public final class OptionsSupport {
             }
             else {
                 System.err.println("WARNING: Invalid readPreference: '"
-                        + readPreferenceType + "' will be ignored.");
+                        + readPreferenceType + "' will be ignored. "
+                        + "Must be one of [ primary | primary_preferred | "
+                        + "secondary | secondary_preferred | nearest ]");
             }
         }
 
diff --git a/mongodb/src/test/java/com/yahoo/ycsb/db/OptionsSupportTest.java b/mongodb/src/test/java/com/yahoo/ycsb/db/OptionsSupportTest.java
index b66525c1..20829d55 100644
--- a/mongodb/src/test/java/com/yahoo/ycsb/db/OptionsSupportTest.java
+++ b/mongodb/src/test/java/com/yahoo/ycsb/db/OptionsSupportTest.java
@@ -115,6 +115,10 @@ public class OptionsSupportTest {
                 updateUrl("mongodb://locahost:27017/?foo=bar",
                         props("mongodb.writeConcern", "replica_acknowledged")),
                 is("mongodb://locahost:27017/?foo=bar&w=2"));
+        assertThat(
+                updateUrl("mongodb://locahost:27017/?foo=bar",
+                        props("mongodb.writeConcern", "majority")),
+                is("mongodb://locahost:27017/?foo=bar&w=majority"));
 
         // w already exists.
         assertThat(
-- 
GitLab