From 92d86e74a8b3dbfe27dc9c73d93048174c4d2874 Mon Sep 17 00:00:00 2001
From: "Robert J. Moore" <Robert.J.Moore@allanbank.com>
Date: Sun, 9 Sep 2012 16:43:11 -0400
Subject: [PATCH] gh-95 Update the MongoDB driver to use a singleton Mongo
 instance for each test client.

Add a control (via properties) for the number of connections the Mongo
instance will create/use.
---
 mongodb/README.md                             |   2 +
 .../java/com/yahoo/ycsb/db/MongoDbClient.java | 284 +++++++++++-------
 2 files changed, 172 insertions(+), 114 deletions(-)

diff --git a/mongodb/README.md b/mongodb/README.md
index 63d0158a..8f10c657 100644
--- a/mongodb/README.md
+++ b/mongodb/README.md
@@ -40,3 +40,5 @@ See the next section for the list of configuration parameters for MongoDB.
 ### `mongodb.database` (default: `ycsb`)
 
 ### `mongodb.writeConcern` (default `safe`)
+
+### `mongodb.maxconnections` (default `10`)
diff --git a/mongodb/src/main/java/com/yahoo/ycsb/db/MongoDbClient.java b/mongodb/src/main/java/com/yahoo/ycsb/db/MongoDbClient.java
index 7c8df19a..3082e4c9 100644
--- a/mongodb/src/main/java/com/yahoo/ycsb/db/MongoDbClient.java
+++ b/mongodb/src/main/java/com/yahoo/ycsb/db/MongoDbClient.java
@@ -11,10 +11,11 @@ package com.yahoo.ycsb.db;
 
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
-import java.util.Map;
 import java.util.Vector;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import com.mongodb.BasicDBObject;
 import com.mongodb.DBAddress;
@@ -22,102 +23,133 @@ import com.mongodb.DBCollection;
 import com.mongodb.DBCursor;
 import com.mongodb.DBObject;
 import com.mongodb.Mongo;
+import com.mongodb.MongoOptions;
 import com.mongodb.WriteConcern;
 import com.mongodb.WriteResult;
-
+import com.yahoo.ycsb.ByteArrayByteIterator;
+import com.yahoo.ycsb.ByteIterator;
 import com.yahoo.ycsb.DB;
 import com.yahoo.ycsb.DBException;
-import com.yahoo.ycsb.ByteIterator;
-import com.yahoo.ycsb.StringByteIterator;
 
 /**
  * MongoDB client for YCSB framework.
- *
+ * 
  * Properties to set:
- *
- * mongodb.url=mongodb://localhost:27017
- * mongodb.database=ycsb
+ * 
+ * mongodb.url=mongodb://localhost:27017 mongodb.database=ycsb
  * mongodb.writeConcern=normal
- *
+ * 
  * @author ypai
- *
  */
 public class MongoDbClient extends DB {
 
-    private Mongo mongo;
-    private WriteConcern writeConcern;
-    private String database;
+    /** Used to include a field in a response. */
+    protected static final Integer INCLUDE = Integer.valueOf(1);
+
+    /** A singleton Mongo instance. */
+    private static Mongo mongo;
+
+    /** The default write concern for the test. */
+    private static WriteConcern writeConcern;
+
+    /** The database to access. */
+    private static String database;
+
+    /** Count the number of times initialized to teardown on the last {@link #cleanup()}. */
+    private static final AtomicInteger initCount = new AtomicInteger(0);
 
-    @Override
     /**
      * Initialize any state for this DB.
      * Called once per DB instance; there is one DB instance per client thread.
      */
+    @Override
     public void init() throws DBException {
-        // initialize MongoDb driver
-        Properties props = getProperties();
-        String url = props.getProperty("mongodb.url", "mongodb://localhost:27017");
-        database = props.getProperty("mongodb.database", "ycsb");
-        String writeConcernType = props.getProperty("mongodb.writeConcern", "safe").toLowerCase();
-
-        if ("none".equals(writeConcernType)) {
-            writeConcern = WriteConcern.NONE;
-        } else if ("safe".equals(writeConcernType)) {
-            writeConcern = WriteConcern.SAFE;
-        } else if ("normal".equals(writeConcernType)) {
-            writeConcern = WriteConcern.NORMAL;
-        } else if ("fsync_safe".equals(writeConcernType)) {
-            writeConcern = WriteConcern.FSYNC_SAFE;
-        } else if ("replicas_safe".equals(writeConcernType)) {
-            writeConcern = WriteConcern.REPLICAS_SAFE;
-        } else {
-            System.err.println("ERROR: Invalid writeConcern: '" + writeConcernType + "'. " +
-                "Must be [ none | safe | normal | fsync_safe | replicas_safe ]");
-            System.exit(1);
-        }
+        initCount.incrementAndGet();
+        synchronized (INCLUDE) {
+            if (mongo != null) {
+                return;
+            }
 
-        try {
-            // strip out prefix since Java driver doesn't currently support
-            // standard connection format URL yet
-            // http://www.mongodb.org/display/DOCS/Connections
-            if (url.startsWith("mongodb://")) {
-                url = url.substring(10);
+            // initialize MongoDb driver
+            Properties props = getProperties();
+            String url = props.getProperty("mongodb.url",
+                    "mongodb://localhost:27017");
+            database = props.getProperty("mongodb.database", "ycsb");
+            String writeConcernType = props.getProperty("mongodb.writeConcern",
+                    "safe").toLowerCase();
+            final String maxConnections = props.getProperty(
+                    "mongodb.maxconnections", "10");
+
+            if ("none".equals(writeConcernType)) {
+                writeConcern = WriteConcern.NONE;
+            }
+            else if ("safe".equals(writeConcernType)) {
+                writeConcern = WriteConcern.SAFE;
+            }
+            else if ("normal".equals(writeConcernType)) {
+                writeConcern = WriteConcern.NORMAL;
+            }
+            else if ("fsync_safe".equals(writeConcernType)) {
+                writeConcern = WriteConcern.FSYNC_SAFE;
+            }
+            else if ("replicas_safe".equals(writeConcernType)) {
+                writeConcern = WriteConcern.REPLICAS_SAFE;
+            }
+            else {
+                System.err
+                        .println("ERROR: Invalid writeConcern: '"
+                                + writeConcernType
+                                + "'. "
+                                + "Must be [ none | safe | normal | fsync_safe | replicas_safe ]");
+                System.exit(1);
             }
 
-            // need to append db to url.
-            url += "/"+database;
-            System.out.println("new database url = "+url);
-            mongo = new Mongo(new DBAddress(url));
-            System.out.println("mongo connection created with "+url);
-        } catch (Exception e1) {
-            System.err.println(
-                    "Could not initialize MongoDB connection pool for Loader: "
-                            + e1.toString());
-            e1.printStackTrace();
-            return;
-        }
+            try {
+                // strip out prefix since Java driver doesn't currently support
+                // standard connection format URL yet
+                // http://www.mongodb.org/display/DOCS/Connections
+                if (url.startsWith("mongodb://")) {
+                    url = url.substring(10);
+                }
 
+                // need to append db to url.
+                url += "/" + database;
+                System.out.println("new database url = " + url);
+                MongoOptions options = new MongoOptions();
+                options.connectionsPerHost = Integer.parseInt(maxConnections);
+                mongo = new Mongo(new DBAddress(url), options);
+
+                System.out.println("mongo connection created with " + url);
+            }
+            catch (Exception e1) {
+                System.err
+                        .println("Could not initialize MongoDB connection pool for Loader: "
+                                + e1.toString());
+                e1.printStackTrace();
+                return;
+            }
+        }
     }
-    
+
+    /**
+     * Cleanup any state for this DB.
+     * Called once per DB instance; there is one DB instance per client thread.
+     */
     @Override
-	/**
-	 * Cleanup any state for this DB.
-	 * Called once per DB instance; there is one DB instance per client thread.
-	 */
-	public void cleanup() throws DBException
-	{
-        try {
-        	mongo.close();
-        } catch (Exception e1) {
-        	System.err.println(
-                    "Could not close MongoDB connection pool: "
-                            + e1.toString());
-            e1.printStackTrace();
-            return;
+    public void cleanup() throws DBException {
+        if (initCount.decrementAndGet() <= 0) {
+            try {
+                mongo.close();
+            }
+            catch (Exception e1) {
+                System.err.println("Could not close MongoDB connection pool: "
+                        + e1.toString());
+                e1.printStackTrace();
+                return;
+            }
         }
-	}
+    }
 
-    @Override
     /**
      * Delete a record from the database.
      *
@@ -125,8 +157,9 @@ public class MongoDbClient extends DB {
      * @param key The record key of the record to delete.
      * @return Zero on success, a non-zero error code on error. See this class's description for a discussion of error codes.
      */
+    @Override
     public int delete(String table, String key) {
-        com.mongodb.DB db=null;
+        com.mongodb.DB db = null;
         try {
             db = mongo.getDB(database);
             db.requestStart();
@@ -134,20 +167,18 @@ public class MongoDbClient extends DB {
             DBObject q = new BasicDBObject().append("_id", key);
             WriteResult res = collection.remove(q, writeConcern);
             return res.getN() == 1 ? 0 : 1;
-        } catch (Exception e) {
+        }
+        catch (Exception e) {
             System.err.println(e.toString());
             return 1;
         }
-        finally
-        {
-            if (db!=null)
-            {
+        finally {
+            if (db != null) {
                 db.requestDone();
             }
         }
     }
 
-    @Override
     /**
      * Insert a record in the database. Any field/value pairs in the specified values HashMap will be written into the record with the specified
      * record key.
@@ -157,7 +188,9 @@ public class MongoDbClient extends DB {
      * @param values A HashMap of field/value pairs to insert in the record
      * @return Zero on success, a non-zero error code on error. See this class's description for a discussion of error codes.
      */
-    public int insert(String table, String key, HashMap<String, ByteIterator> values) {
+    @Override
+    public int insert(String table, String key,
+            HashMap<String, ByteIterator> values) {
         com.mongodb.DB db = null;
         try {
             db = mongo.getDB(database);
@@ -166,24 +199,23 @@ public class MongoDbClient extends DB {
 
             DBCollection collection = db.getCollection(table);
             DBObject r = new BasicDBObject().append("_id", key);
-	    for(String k: values.keySet()) {
-		r.put(k, values.get(k).toArray());
-	    }
-            WriteResult res = collection.insert(r,writeConcern);
+            for (String k : values.keySet()) {
+                r.put(k, values.get(k).toArray());
+            }
+            WriteResult res = collection.insert(r, writeConcern);
             return res.getError() == null ? 0 : 1;
-        } catch (Exception e) {
+        }
+        catch (Exception e) {
             e.printStackTrace();
             return 1;
-        } finally {
-            if (db!=null)
-            {
+        }
+        finally {
+            if (db != null) {
                 db.requestDone();
             }
         }
     }
 
-    @Override
-    @SuppressWarnings("unchecked")
     /**
      * Read a record from the database. Each field/value pair from the result will be stored in a HashMap.
      *
@@ -193,6 +225,8 @@ public class MongoDbClient extends DB {
      * @param result A HashMap of field/value pairs for the result
      * @return Zero on success, a non-zero error code on error or "not found".
      */
+    @Override
+    @SuppressWarnings("unchecked")
     public int read(String table, String key, Set<String> fields,
             HashMap<String, ByteIterator> result) {
         com.mongodb.DB db = null;
@@ -204,16 +238,16 @@ public class MongoDbClient extends DB {
             DBCollection collection = db.getCollection(table);
             DBObject q = new BasicDBObject().append("_id", key);
             DBObject fieldsToReturn = new BasicDBObject();
-            boolean returnAllFields = fields == null;
 
             DBObject queryResult = null;
-            if (!returnAllFields) {
+            if (fields != null) {
                 Iterator<String> iter = fields.iterator();
                 while (iter.hasNext()) {
-                    fieldsToReturn.put(iter.next(), 1);
+                    fieldsToReturn.put(iter.next(), INCLUDE);
                 }
                 queryResult = collection.findOne(q, fieldsToReturn);
-            } else {
+            }
+            else {
                 queryResult = collection.findOne(q);
             }
 
@@ -221,19 +255,18 @@ public class MongoDbClient extends DB {
                 result.putAll(queryResult.toMap());
             }
             return queryResult != null ? 0 : 1;
-        } catch (Exception e) {
+        }
+        catch (Exception e) {
             System.err.println(e.toString());
             return 1;
-        } finally {
-            if (db!=null)
-            {
+        }
+        finally {
+            if (db != null) {
                 db.requestDone();
             }
         }
     }
 
-
-    @Override
     /**
      * Update a record in the database. Any field/value pairs in the specified values HashMap will be written into the record with the specified
      * record key, overwriting any existing values with the same field name.
@@ -243,7 +276,9 @@ public class MongoDbClient extends DB {
      * @param values A HashMap of field/value pairs to update in the record
      * @return Zero on success, a non-zero error code on error. See this class's description for a discussion of error codes.
      */
-    public int update(String table, String key, HashMap<String, ByteIterator> values) {
+    @Override
+    public int update(String table, String key,
+            HashMap<String, ByteIterator> values) {
         com.mongodb.DB db = null;
         try {
             db = mongo.getDB(database);
@@ -264,19 +299,18 @@ public class MongoDbClient extends DB {
             WriteResult res = collection.update(q, u, false, false,
                     writeConcern);
             return res.getN() == 1 ? 0 : 1;
-        } catch (Exception e) {
+        }
+        catch (Exception e) {
             System.err.println(e.toString());
             return 1;
-        } finally {
-            if (db!=null)
-            {
+        }
+        finally {
+            if (db != null) {
                 db.requestDone();
             }
         }
     }
 
-    @Override
-    @SuppressWarnings("unchecked")
     /**
      * Perform a range scan for a set of records in the database. Each field/value pair from the result will be stored in a HashMap.
      *
@@ -287,9 +321,10 @@ public class MongoDbClient extends DB {
      * @param result A Vector of HashMaps, where each HashMap is a set field/value pairs for one record
      * @return Zero on success, a non-zero error code on error. See this class's description for a discussion of error codes.
      */
+    @Override
     public int scan(String table, String startkey, int recordcount,
             Set<String> fields, Vector<HashMap<String, ByteIterator>> result) {
-        com.mongodb.DB db=null;
+        com.mongodb.DB db = null;
         try {
             db = mongo.getDB(database);
             db.requestStart();
@@ -299,23 +334,44 @@ public class MongoDbClient extends DB {
             DBObject q = new BasicDBObject().append("_id", scanRange);
             DBCursor cursor = collection.find(q).limit(recordcount);
             while (cursor.hasNext()) {
-                //toMap() returns a Map, but result.add() expects a Map<String,String>. Hence, the suppress warnings.
-                result.add(StringByteIterator.getByteIteratorMap((Map<String,String>)cursor.next().toMap()));
+                // toMap() returns a Map, but result.add() expects a
+                // Map<String,String>. Hence, the suppress warnings.
+                HashMap<String, ByteIterator> resultMap = new HashMap<String, ByteIterator>();
+
+                DBObject obj = cursor.next();
+                fillMap(resultMap, obj);
+
+                result.add(resultMap);
             }
 
             return 0;
-        } catch (Exception e) {
+        }
+        catch (Exception e) {
             System.err.println(e.toString());
             return 1;
         }
-        finally
-        {
-            if (db!=null)
-            {
+        finally {
+            if (db != null) {
                 db.requestDone();
             }
         }
 
     }
-}
 
+    /**
+     * TODO - Finish
+     * 
+     * @param resultMap
+     * @param obj
+     */
+    @SuppressWarnings("unchecked")
+    protected void fillMap(HashMap<String, ByteIterator> resultMap, DBObject obj) {
+        Map<String, Object> objMap = obj.toMap();
+        for (Map.Entry<String, Object> entry : objMap.entrySet()) {
+            if (entry.getValue() instanceof byte[]) {
+                resultMap.put(entry.getKey(), new ByteArrayByteIterator(
+                        (byte[]) entry.getValue()));
+            }
+        }
+    }
+}
-- 
GitLab