Skip to content
Snippets Groups Projects
Commit 3b9e207c authored by Robert J. Moore's avatar Robert J. Moore
Browse files

Update the MongoDB drivers to just use a connection string or MongoDB

URI to configure the client. Old properties are deprecated and but will
be added to the URI if present and the URI does not already contain a
value for the setting.
parent 03b25308
No related branches found
No related tags found
No related merge requests found
......@@ -77,34 +77,19 @@ See the next section for the list of configuration parameters for MongoDB.
## MongoDB Configuration Parameters
- `mongodb.url` default: `mongodb://localhost:27017`
- `mongodb.url` default: `mongodb://localhost:27017/ycsb?w=1`
- This should be a MongoDB URI or connection string.
- See http://docs.mongodb.org/manual/reference/connection-string/ for the standard options.
- For the complete set of options for the asynchronous driver see:
- http://www.allanbank.com/mongodb-async-driver/apidocs/index.html?com/allanbank/mongodb/MongoDbUri.html
- For the complete set of options for the synchronous driver see:
- http://api.mongodb.org/java/current/index.html?com/mongodb/MongoClientURI.html
- `mongodb.database` default: `ycsb`
- `mongodb.writeConcern` default `acknowledged`
- options are :
- `errors_ignored`
- `unacknowledged`
- `acknowledged`
- `journaled`
- `replica_acknowledged`
- `mongodb.readPreference` default `primary`
- options are :
- `primary`
- `primary_preferred`
- `secondary`
- `secondary_preferred`
- `nearest`
- `mongodb.maxconnections` (default `100`)
- `mongodb.threadsAllowedToBlockForConnectionMultiplier` (default `5`)
For example:
./bin/ycsb load mongodb-async -s -P workloads/workloada -p mongodb.writeConcern=unacknowledged
./bin/ycsb load mongodb-async -s -P workloads/workloada -p mongodb.url=mongodb://localhost:27017/ycsb?w=0
To run with the synchronous driver from MongoDB Inc.:
./bin/ycsb load mongodb -s -P workloads/workloada -p mongodb.writeConcern=unacknowledged
./bin/ycsb load mongodb -s -P workloads/workloada -p mongodb.url=mongodb://localhost:27017/ycsb?w=0
......@@ -19,6 +19,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import com.allanbank.mongodb.Durability;
import com.allanbank.mongodb.LockType;
import com.allanbank.mongodb.MongoClient;
import com.allanbank.mongodb.MongoClientConfiguration;
import com.allanbank.mongodb.MongoCollection;
import com.allanbank.mongodb.MongoDatabase;
import com.allanbank.mongodb.MongoDbUri;
......@@ -126,11 +127,6 @@ public class AsyncMongoDbClient extends DB {
public final void init() throws DBException {
final int count = initCount.incrementAndGet();
final Properties props = getProperties();
final String maxConnections = props.getProperty(
"mongodb.maxconnections", "100");
final int connections = Integer.parseInt(maxConnections);
synchronized (AsyncMongoDbClient.class) {
if (mongo != null) {
db = mongo.getDatabase(database);
......@@ -138,78 +134,51 @@ public class AsyncMongoDbClient extends DB {
// If there are more threads (count) than connections then the
// Low latency spin lock is not really needed as we will keep
// the connections occupied.
if (count > connections) {
if (count > mongo.getConfig().getMaxConnectionCount()) {
mongo.getConfig().setLockType(LockType.MUTEX);
}
return;
}
// initialize MongoDb driver
// Just use the standard connection format URL
// http://docs.mongodb.org/manual/reference/connection-string/
final Properties props = getProperties();
String url = props.getProperty("mongodb.url",
"mongodb://localhost:27017");
database = props.getProperty("mongodb.database", "ycsb");
String writeConcernType = props.getProperty("mongodb.writeConcern",
props.getProperty("mongodb.durability", "acknowledged"))
.toLowerCase();
if ("errors_ignored".equals(writeConcernType)) {
writeConcern = Durability.NONE;
}
else if ("unacknowledged".equals(writeConcernType)) {
writeConcern = Durability.NONE;
}
else if ("acknowledged".equals(writeConcernType)) {
writeConcern = Durability.ACK;
}
else if ("journaled".equals(writeConcernType)) {
writeConcern = Durability.journalDurable(0);
}
else if ("replica_acknowledged".equals(writeConcernType)) {
writeConcern = Durability.replicaDurable(2, 0);
}
else {
"mongodb://localhost:27017/ycsb?w=1");
if (!url.startsWith("mongodb://")) {
System.err
.println("ERROR: Invalid writeConcern: '"
+ writeConcernType
+ "'. "
+ "Must be [ errors_ignored | unacknowledged | acknowledged | journaled | replica_acknowledged ]");
.println("ERROR: Invalid URL: '"
+ url
+ "'. Must be of the form "
+ "'mongodb://<host1>:<port1>,<host2>:<port2>/database?options'. "
+ "See http://docs.mongodb.org/manual/reference/connection-string/.");
System.exit(1);
}
// readPreference
String readPreferenceType = props.getProperty(
"mongodb.readPreference", "primary").toLowerCase();
if ("primary".equals(readPreferenceType)) {
readPreference = ReadPreference.primary();
}
else if ("primary_preferred".equals(readPreferenceType)) {
readPreference = ReadPreference.preferPrimary();
}
else if ("secondary".equals(readPreferenceType)) {
readPreference = ReadPreference.secondary();
}
else if ("secondary_preferred".equals(readPreferenceType)) {
readPreference = ReadPreference.preferSecondary();
}
else if ("nearest".equals(readPreferenceType)) {
readPreference = ReadPreference.closest();
}
else {
System.err
.println("ERROR: Invalid readPreference: '"
+ readPreferenceType
+ "'. Must be [ primary | primary_preferred | secondary | secondary_preferred | nearest ]");
System.exit(1);
}
MongoDbUri uri = new MongoDbUri(url);
try {
// need to append db to url.
url += "/" + database;
System.out.println("new database url = " + url);
mongo = MongoFactory.createClient(new MongoDbUri(url));
mongo.getConfig().setMaxConnectionCount(connections);
mongo.getConfig().setLockType(LockType.LOW_LATENCY_SPIN); // assumed...
database = uri.getDatabase();
if ((database == null) || database.isEmpty()) {
System.err
.println("ERROR: Invalid URL: '"
+ url
+ "'. Must provide a database name with the URI. "
+ "'mongodb://<host1>:<port1>,<host2>:<port2>/database");
System.exit(1);
}
mongo = MongoFactory.createClient(uri);
MongoClientConfiguration config = mongo.getConfig();
if (!url.toLowerCase().contains("locktype=")) {
config.setLockType(LockType.LOW_LATENCY_SPIN); // assumed...
}
readPreference = config.getDefaultReadPreference();
writeConcern = config.getDefaultDurability();
db = mongo.getDatabase(database);
System.out.println("mongo connection created with " + url);
......
......@@ -19,12 +19,11 @@ import java.util.Vector;
import java.util.concurrent.atomic.AtomicInteger;
import com.mongodb.BasicDBObject;
import com.mongodb.DBAddress;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientOptions;
import com.mongodb.MongoClientURI;
import com.mongodb.ReadPreference;
import com.mongodb.WriteConcern;
import com.yahoo.ycsb.ByteArrayByteIterator;
......@@ -32,7 +31,6 @@ import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.DB;
import com.yahoo.ycsb.DBException;
/**
* MongoDB client for YCSB framework.
*
......@@ -48,15 +46,6 @@ public class MongoDbClient extends DB {
/** Used to include a field in a response. */
protected static final Integer INCLUDE = Integer.valueOf(1);
/** A singleton Mongo instance. */
private static MongoClient mongo;
/** The default write concern for the test. */
private static WriteConcern writeConcern;
/** The default read preference for the test */
private static ReadPreference readPreference;
/** The database to access. */
private static String database;
......@@ -66,125 +55,14 @@ public class MongoDbClient extends DB {
*/
private static final AtomicInteger initCount = new AtomicInteger(0);
/**
* Initialize any state for this DB. Called once per DB instance; there is
* one DB instance per client thread.
*/
@Override
public void init() throws DBException {
initCount.incrementAndGet();
synchronized (INCLUDE) {
if (mongo != null) {
return;
}
// initialize MongoDb driver
Properties props = getProperties();
String url = props.getProperty("mongodb.url",
"mongodb://localhost:27017");
if (url.contains(",")) {
//pick one and random
String[] urls = url.split(",");
int index = new Random().nextInt(urls.length);
url = urls[index];
System.out.printf("Using Mongo URL: %s\n", url);
}
database = props.getProperty("mongodb.database", "ycsb");
final String maxConnections = props.getProperty(
"mongodb.maxconnections", "100");
final String threadsAllowedToBlockForConnectionMultiplier = props
.getProperty(
"mongodb.threadsAllowedToBlockForConnectionMultiplier",
"5");
// write concern
String writeConcernType = props.getProperty("mongodb.writeConcern",
"acknowledged").toLowerCase();
if ("errors_ignored".equals(writeConcernType)) {
writeConcern = WriteConcern.ERRORS_IGNORED;
}
else if ("unacknowledged".equals(writeConcernType)) {
writeConcern = WriteConcern.UNACKNOWLEDGED;
}
else if ("acknowledged".equals(writeConcernType)) {
writeConcern = WriteConcern.ACKNOWLEDGED;
}
else if ("journaled".equals(writeConcernType)) {
writeConcern = WriteConcern.JOURNALED;
}
else if ("replica_acknowledged".equals(writeConcernType)) {
writeConcern = WriteConcern.REPLICA_ACKNOWLEDGED;
}
else {
System.err
.println("ERROR: Invalid writeConcern: '"
+ writeConcernType
+ "'. "
+ "Must be [ errors_ignored | unacknowledged | acknowledged | journaled | replica_acknowledged ]");
System.exit(1);
}
// readPreference
String readPreferenceType = props.getProperty(
"mongodb.readPreference", "primary").toLowerCase();
if ("primary".equals(readPreferenceType)) {
readPreference = ReadPreference.primary();
}
else if ("primary_preferred".equals(readPreferenceType)) {
readPreference = ReadPreference.primaryPreferred();
}
else if ("secondary".equals(readPreferenceType)) {
readPreference = ReadPreference.secondary();
}
else if ("secondary_preferred".equals(readPreferenceType)) {
readPreference = ReadPreference.secondaryPreferred();
}
else if ("nearest".equals(readPreferenceType)) {
readPreference = ReadPreference.nearest();
}
else {
System.err
.println("ERROR: Invalid readPreference: '"
+ readPreferenceType
+ "'. Must be [ primary | primary_preferred | secondary | secondary_preferred | nearest ]");
System.exit(1);
}
try {
// strip out prefix since Java driver doesn't currently support
// standard connection format URL yet
// http://www.mongodb.org/display/DOCS/Connections
if (url.startsWith("mongodb://")) {
url = url.substring(10);
}
/** A singleton Mongo instance. */
private static MongoClient mongo;
// need to append db to url.
url += "/" + database;
System.out.println("new database url = " + url);
MongoClientOptions options = MongoClientOptions
.builder()
.cursorFinalizerEnabled(false)
.connectionsPerHost(Integer.parseInt(maxConnections))
.threadsAllowedToBlockForConnectionMultiplier(
Integer.parseInt(threadsAllowedToBlockForConnectionMultiplier))
.build();
mongo = new MongoClient(new DBAddress(url), options);
/** The default read preference for the test */
private static ReadPreference readPreference;
System.out.println("mongo connection created with " + url);
}
catch (Exception e1) {
System.err
.println("Could not initialize MongoDB connection pool for Loader: "
+ e1.toString());
e1.printStackTrace();
return;
}
}
}
/** The default write concern for the test. */
private static WriteConcern writeConcern;
/**
* Cleanup any state for this DB. Called once per DB instance; there is one
......@@ -237,6 +115,80 @@ public class MongoDbClient extends DB {
}
}
/**
* Initialize any state for this DB. Called once per DB instance; there is
* one DB instance per client thread.
*/
@Override
public void init() throws DBException {
initCount.incrementAndGet();
synchronized (INCLUDE) {
if (mongo != null) {
return;
}
// Just use the standard connection format URL
// http://docs.mongodb.org/manual/reference/connection-string/
//
// Support legacy options by updating the URL as appropriate.
Properties props = getProperties();
String url = props.getProperty("mongodb.url", null);
boolean defaultedUrl = false;
if (url == null) {
defaultedUrl = true;
url = "mongodb://localhost:27017/ycsb?w=1";
}
url = OptionsSupport.updateUrl(url, props);
if (!url.startsWith("mongodb://")) {
System.err
.println("ERROR: Invalid URL: '"
+ url
+ "'. Must be of the form "
+ "'mongodb://<host1>:<port1>,<host2>:<port2>/database?options'. "
+ "See http://docs.mongodb.org/manual/reference/connection-string/.");
System.exit(1);
}
try {
MongoClientURI uri = new MongoClientURI(url);
String uriDb = uri.getDatabase();
if (!defaultedUrl && (uriDb != null) && !uriDb.isEmpty()
&& !"admin".equals(uriDb)) {
database = uriDb;
}
else {
database = props.getProperty("mongodb.database", "ycsb");
}
if ((database == null) || database.isEmpty()) {
System.err
.println("ERROR: Invalid URL: '"
+ url
+ "'. Must provide a database name with the URI. "
+ "'mongodb://<host1>:<port1>,<host2>:<port2>/database");
System.exit(1);
}
readPreference = uri.getOptions().getReadPreference();
writeConcern = uri.getOptions().getWriteConcern();
mongo = new MongoClient(uri);
System.out.println("mongo connection created with " + url);
}
catch (Exception e1) {
System.err
.println("Could not initialize MongoDB connection pool for Loader: "
+ e1.toString());
e1.printStackTrace();
return;
}
}
}
/**
* Insert a record in the database. Any field/value pairs in the specified
* values HashMap will be written into the record with the specified record
......@@ -340,54 +292,6 @@ public class MongoDbClient extends DB {
}
}
/**
* Update a record in the database. Any field/value pairs in the specified
* values HashMap will be written into the record with the specified record
* key, overwriting any existing values with the same field name.
*
* @param table
* The name of the table
* @param key
* The record key of the record to write.
* @param values
* A HashMap of field/value pairs to update in the record
* @return Zero on success, a non-zero error code on error. See this class's
* description for a discussion of error codes.
*/
@Override
public int update(String table, String key,
HashMap<String, ByteIterator> values) {
com.mongodb.DB db = null;
try {
db = mongo.getDB(database);
db.requestStart();
DBCollection collection = db.getCollection(table);
DBObject q = new BasicDBObject().append("_id", key);
DBObject u = new BasicDBObject();
DBObject fieldsToSet = new BasicDBObject();
Iterator<String> keys = values.keySet().iterator();
while (keys.hasNext()) {
String tmpKey = keys.next();
fieldsToSet.put(tmpKey, values.get(tmpKey).toArray());
}
u.put("$set", fieldsToSet);
collection.update(q, u, false, false, writeConcern);
return 0;
}
catch (Exception e) {
System.err.println(e.toString());
return 1;
}
finally {
if (db != null) {
db.requestDone();
}
}
}
/**
* Perform a range scan for a set of records in the database. Each
* field/value pair from the result will be stored in a HashMap.
......@@ -448,6 +352,54 @@ public class MongoDbClient extends DB {
}
/**
* Update a record in the database. Any field/value pairs in the specified
* values HashMap will be written into the record with the specified record
* key, overwriting any existing values with the same field name.
*
* @param table
* The name of the table
* @param key
* The record key of the record to write.
* @param values
* A HashMap of field/value pairs to update in the record
* @return Zero on success, a non-zero error code on error. See this class's
* description for a discussion of error codes.
*/
@Override
public int update(String table, String key,
HashMap<String, ByteIterator> values) {
com.mongodb.DB db = null;
try {
db = mongo.getDB(database);
db.requestStart();
DBCollection collection = db.getCollection(table);
DBObject q = new BasicDBObject().append("_id", key);
DBObject u = new BasicDBObject();
DBObject fieldsToSet = new BasicDBObject();
Iterator<String> keys = values.keySet().iterator();
while (keys.hasNext()) {
String tmpKey = keys.next();
fieldsToSet.put(tmpKey, values.get(tmpKey).toArray());
}
u.put("$set", fieldsToSet);
collection.update(q, u, false, false, writeConcern);
return 0;
}
catch (Exception e) {
System.err.println(e.toString());
return 1;
}
finally {
if (db != null) {
db.requestDone();
}
}
}
/**
* Fills the map with the values from the DBObject.
*
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment