From 0b68bd48fed091e824cdc9f729d91522ff4d0d66 Mon Sep 17 00:00:00 2001
From: Matias Cascallares <matiascas@gmail.com>
Date: Thu, 10 Oct 2013 12:13:00 +0800
Subject: [PATCH] added support to specify readPreference

---
 mongodb/README.md                             |  8 ++++
 .../com/yahoo/ycsb/db/AsyncMongoDbClient.java | 32 +++++++++++++
 .../java/com/yahoo/ycsb/db/MongoDbClient.java | 45 ++++++++++++++++---
 3 files changed, 79 insertions(+), 6 deletions(-)

diff --git a/mongodb/README.md b/mongodb/README.md
index f4dac7dc..9d2b0fb7 100644
--- a/mongodb/README.md
+++ b/mongodb/README.md
@@ -78,6 +78,14 @@ See the next section for the list of configuration parameters for MongoDB.
   - `journaled`
   - `replica_acknowledged`
 
+- `mongodb.readPreference` default `primary`
+ - options are :
+  - `primary`
+  - `primary_preferred`
+  - `secondary`
+  - `secondary_preferred`
+  - `nearest`
+
 - `mongodb.maxconnections` (default `100`)
 
 - `mongodb.threadsAllowedToBlockForConnectionMultiplier` (default `5`)
diff --git a/mongodb/src/main/java/com/yahoo/ycsb/db/AsyncMongoDbClient.java b/mongodb/src/main/java/com/yahoo/ycsb/db/AsyncMongoDbClient.java
index 9c4a8da8..98c2cec0 100644
--- a/mongodb/src/main/java/com/yahoo/ycsb/db/AsyncMongoDbClient.java
+++ b/mongodb/src/main/java/com/yahoo/ycsb/db/AsyncMongoDbClient.java
@@ -24,6 +24,7 @@ import com.allanbank.mongodb.MongoDatabase;
 import com.allanbank.mongodb.MongoDbUri;
 import com.allanbank.mongodb.MongoFactory;
 import com.allanbank.mongodb.MongoIterator;
+import com.allanbank.mongodb.ReadPreference;
 import com.allanbank.mongodb.bson.Document;
 import com.allanbank.mongodb.bson.Element;
 import com.allanbank.mongodb.bson.ElementType;
@@ -68,6 +69,9 @@ public class AsyncMongoDbClient extends DB {
     /** The write concern for the requests. */
     private static Durability writeConcern;
 
+    /** Which servers to use for reads. */
+    private static ReadPreference readPreference;
+
     /** The database to MongoDB. */
     private MongoDatabase db;
 
@@ -173,6 +177,32 @@ public class AsyncMongoDbClient extends DB {
                 System.exit(1);
             }
 
+            // readPreference
+            String readPreferenceType = props.getProperty(
+                    "mongodb.readPreference", "primary").toLowerCase();
+            if ("primary".equals(readPreferenceType)) {
+                readPreference = ReadPreference.primary();
+            }
+            else if ("primary_preferred".equals(readPreferenceType)) {
+                readPreference = ReadPreference.preferPrimary();
+            }
+            else if ("secondary".equals(readPreferenceType)) {
+                readPreference = ReadPreference.secondary();
+            }
+            else if ("secondary_preferred".equals(readPreferenceType)) {
+                readPreference = ReadPreference.preferSecondary();
+            }
+            else if ("nearest".equals(readPreferenceType)) {
+                readPreference = ReadPreference.closest();
+            }
+            else {
+                System.err
+                        .println("ERROR: Invalid readPreference: '"
+                                + readPreferenceType
+                                + "'. Must be [ primary | primary_preferred | secondary | secondary_preferred | nearest ]");
+                System.exit(1);
+            }
+
             try {
                 // need to append db to url.
                 url += "/" + database;
@@ -261,6 +291,7 @@ public class AsyncMongoDbClient extends DB {
                 fb.projection(fieldsToReturn);
                 fb.setLimit(1);
                 fb.setBatchSize(1);
+                fb.readPreference(readPreference);
 
                 final MongoIterator<Document> ci = collection.find(fb.build());
                 if (ci.hasNext()) {
@@ -314,6 +345,7 @@ public class AsyncMongoDbClient extends DB {
             fb.setQuery(where("_id").greaterThanOrEqualTo(startkey));
             fb.setLimit(recordcount);
             fb.setBatchSize(recordcount);
+            fb.readPreference(readPreference);
             if (fields != null) {
                 final DocumentBuilder fieldsDoc = BuilderFactory.start();
                 for (final String field : fields) {
diff --git a/mongodb/src/main/java/com/yahoo/ycsb/db/MongoDbClient.java b/mongodb/src/main/java/com/yahoo/ycsb/db/MongoDbClient.java
index 87005109..281c6b38 100644
--- a/mongodb/src/main/java/com/yahoo/ycsb/db/MongoDbClient.java
+++ b/mongodb/src/main/java/com/yahoo/ycsb/db/MongoDbClient.java
@@ -25,6 +25,7 @@ import com.mongodb.DBCursor;
 import com.mongodb.DBObject;
 import com.mongodb.MongoClient;
 import com.mongodb.MongoClientOptions;
+import com.mongodb.ReadPreference;
 import com.mongodb.WriteConcern;
 import com.yahoo.ycsb.ByteArrayByteIterator;
 import com.yahoo.ycsb.ByteIterator;
@@ -53,6 +54,9 @@ public class MongoDbClient extends DB {
     /** The default write concern for the test. */
     private static WriteConcern writeConcern;
 
+    /** The default read preference for the test */
+    private static ReadPreference readPreference;
+
     /** The database to access. */
     private static String database;
 
@@ -88,10 +92,7 @@ public class MongoDbClient extends DB {
             }
 
             database = props.getProperty("mongodb.database", "ycsb");
-            String writeConcernType = props.getProperty("mongodb.writeConcern",
-                    "acknowledged").toLowerCase();
 
-            // Set connectionpool to size of ycsb thread pool
             final String maxConnections = props.getProperty(
                     "mongodb.maxconnections", "100");
             final String threadsAllowedToBlockForConnectionMultiplier = props
@@ -99,6 +100,9 @@ public class MongoDbClient extends DB {
                             "mongodb.threadsAllowedToBlockForConnectionMultiplier",
                             "5");
 
+            // write concern
+            String writeConcernType = props.getProperty("mongodb.writeConcern",
+                    "acknowledged").toLowerCase();
             if ("errors_ignored".equals(writeConcernType)) {
                 writeConcern = WriteConcern.ERRORS_IGNORED;
             }
@@ -123,12 +127,39 @@ public class MongoDbClient extends DB {
                 System.exit(1);
             }
 
+            // readPreference
+            String readPreferenceType = props.getProperty(
+                    "mongodb.readPreference", "primary").toLowerCase();
+            if ("primary".equals(readPreferenceType)) {
+                readPreference = ReadPreference.primary();
+            }
+            else if ("primary_preferred".equals(readPreferenceType)) {
+                readPreference = ReadPreference.primaryPreferred();
+            }
+            else if ("secondary".equals(readPreferenceType)) {
+                readPreference = ReadPreference.secondary();
+            }
+            else if ("secondary_preferred".equals(readPreferenceType)) {
+                readPreference = ReadPreference.secondaryPreferred();
+            }
+            else if ("nearest".equals(readPreferenceType)) {
+                readPreference = ReadPreference.nearest();
+            }
+            else {
+                System.err
+                        .println("ERROR: Invalid readPreference: '"
+                                + readPreferenceType
+                                + "'. Must be [ primary | primary_preferred | secondary | secondary_preferred | nearest ]");
+                System.exit(1);
+            }
+
             try {
                 // strip out prefix since Java driver doesn't currently support
                 // standard connection format URL yet
                 // http://www.mongodb.org/display/DOCS/Connections
                 if (url.startsWith("mongodb://")) {
                     url = url.substring(10);
+
                 }
 
                 // need to append db to url.
@@ -282,10 +313,11 @@ public class MongoDbClient extends DB {
                 while (iter.hasNext()) {
                     fieldsToReturn.put(iter.next(), INCLUDE);
                 }
-                queryResult = collection.findOne(q, fieldsToReturn);
+                queryResult = collection.findOne(q, fieldsToReturn,
+                        readPreference);
             }
             else {
-                queryResult = collection.findOne(q);
+                queryResult = collection.findOne(q, null, readPreference);
             }
 
             if (queryResult != null) {
@@ -382,7 +414,8 @@ public class MongoDbClient extends DB {
             // { "_id":{"$gte":startKey, "$lte":{"appId":key+"\uFFFF"}} }
             DBObject scanRange = new BasicDBObject().append("$gte", startkey);
             DBObject q = new BasicDBObject().append("_id", scanRange);
-            cursor = collection.find(q).limit(recordcount);
+            cursor = collection.find(q).setReadPreference(readPreference)
+                    .limit(recordcount);
             while (cursor.hasNext()) {
                 // toMap() returns a Map, but result.add() expects a
                 // Map<String,String>. Hence, the suppress warnings.
-- 
GitLab