Skip to content
Snippets Groups Projects
Commit 6970ee11 authored by Robert J. Moore's avatar Robert J. Moore
Browse files

Add majority write concern and deprecated most of the options. Migrated

to the 3.0 driver from MongoDB Inc. and updated to the new Document and
fluent APIs.
parent eaebc719
No related branches found
No related tags found
No related merge requests found
...@@ -77,14 +77,47 @@ See the next section for the list of configuration parameters for MongoDB. ...@@ -77,14 +77,47 @@ See the next section for the list of configuration parameters for MongoDB.
## MongoDB Configuration Parameters ## MongoDB Configuration Parameters
- `mongodb.url` default: `mongodb://localhost:27017/ycsb?w=1` - `mongodb.url`
- This should be a MongoDB URI or connection string. - This should be a MongoDB URI or connection string.
- See http://docs.mongodb.org/manual/reference/connection-string/ for the standard options. - See http://docs.mongodb.org/manual/reference/connection-string/ for the standard options.
- For the complete set of options for the asynchronous driver see: - For the complete set of options for the asynchronous driver see:
- http://www.allanbank.com/mongodb-async-driver/apidocs/index.html?com/allanbank/mongodb/MongoDbUri.html - http://www.allanbank.com/mongodb-async-driver/apidocs/index.html?com/allanbank/mongodb/MongoDbUri.html
- For the complete set of options for the synchronous driver see: - For the complete set of options for the synchronous driver see:
- http://api.mongodb.org/java/current/index.html?com/mongodb/MongoClientURI.html - http://api.mongodb.org/java/current/index.html?com/mongodb/MongoClientURI.html
- Default value is `mongodb://localhost:27017/ycsb?w=1`
- `mongodb.batchsize`
- Useful for the insert workload as it will submit the inserts in batches inproving throughput.
- Default value is `1`.
- `mongodb.writeConcern`
- **Deprecated** - Use the `w` and `journal` options on the MongoDB URI provided by the `mongodb.uri`.
- Allowed values are :
- `errors_ignored`
- `unacknowledged`
- `acknowledged`
- `journaled`
- `replica_acknowledged`
- `majority`
- Default value is `acknowledged`.
- `mongodb.readPreference`
- **Deprecated** - Use the `readPreference` options on the MongoDB URI provided by the `mongodb.uri`.
- Allowed values are :
- `primary`
- `primary_preferred`
- `secondary`
- `secondary_preferred`
- `nearest`
- Default value is `primary`.
- `mongodb.maxconnections`
- **Deprecated** - Use the `maxPoolSize` options on the MongoDB URI provided by the `mongodb.uri`.
- Default value is `100`.
- `mongodb.threadsAllowedToBlockForConnectionMultiplier`
- **Deprecated** - Use the `waitQueueMultiple` options on the MongoDB URI provided by the `mongodb.uri`.
- Default value is `5`.
For example: For example:
......
...@@ -9,7 +9,7 @@ ...@@ -9,7 +9,7 @@
</parent> </parent>
<artifactId>mongodb-binding</artifactId> <artifactId>mongodb-binding</artifactId>
<name>Mongo DB Binding</name> <name>MongoDB Binding</name>
<packaging>jar</packaging> <packaging>jar</packaging>
<dependencies> <dependencies>
...@@ -33,6 +33,13 @@ ...@@ -33,6 +33,13 @@
<artifactId>logback-classic</artifactId> <artifactId>logback-classic</artifactId>
<version>1.1.2</version> <version>1.1.2</version>
</dependency> </dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>
......
...@@ -42,7 +42,10 @@ import com.allanbank.mongodb.bson.ElementType; ...@@ -42,7 +42,10 @@ import com.allanbank.mongodb.bson.ElementType;
import com.allanbank.mongodb.bson.builder.BuilderFactory; import com.allanbank.mongodb.bson.builder.BuilderFactory;
import com.allanbank.mongodb.bson.builder.DocumentBuilder; import com.allanbank.mongodb.bson.builder.DocumentBuilder;
import com.allanbank.mongodb.bson.element.BinaryElement; import com.allanbank.mongodb.bson.element.BinaryElement;
import com.allanbank.mongodb.builder.BatchedWrite;
import com.allanbank.mongodb.builder.BatchedWriteMode;
import com.allanbank.mongodb.builder.Find; import com.allanbank.mongodb.builder.Find;
import com.allanbank.mongodb.builder.Sort;
import com.yahoo.ycsb.ByteIterator; import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.DB; import com.yahoo.ycsb.DB;
import com.yahoo.ycsb.DBException; import com.yahoo.ycsb.DBException;
...@@ -59,8 +62,11 @@ import com.yahoo.ycsb.DBException; ...@@ -59,8 +62,11 @@ import com.yahoo.ycsb.DBException;
*/ */
public class AsyncMongoDbClient extends DB { public class AsyncMongoDbClient extends DB {
/** Used to include a field in a response. */
protected static final int INCLUDE = 1;
/** The database to use. */ /** The database to use. */
private static String database; private static String databaseName;
/** Thread local document builder. */ /** Thread local document builder. */
private static final ThreadLocal<DocumentBuilder> DOCUMENT_BUILDER = new ThreadLocal<DocumentBuilder>() { private static final ThreadLocal<DocumentBuilder> DOCUMENT_BUILDER = new ThreadLocal<DocumentBuilder>() {
...@@ -74,7 +80,7 @@ public class AsyncMongoDbClient extends DB { ...@@ -74,7 +80,7 @@ public class AsyncMongoDbClient extends DB {
private static final AtomicInteger initCount = new AtomicInteger(0); private static final AtomicInteger initCount = new AtomicInteger(0);
/** The connection to MongoDB. */ /** The connection to MongoDB. */
private static MongoClient mongo; private static MongoClient mongoClient;
/** The write concern for the requests. */ /** The write concern for the requests. */
private static Durability writeConcern; private static Durability writeConcern;
...@@ -83,7 +89,17 @@ public class AsyncMongoDbClient extends DB { ...@@ -83,7 +89,17 @@ public class AsyncMongoDbClient extends DB {
private static ReadPreference readPreference; private static ReadPreference readPreference;
/** The database to MongoDB. */ /** The database to MongoDB. */
private MongoDatabase db; private MongoDatabase database;
/** The batch size to use for inserts. */
private static int batchSize;
/** The bulk inserts pending for the thread. */
private final BatchedWrite.Builder batchedWrite = BatchedWrite.builder()
.mode(BatchedWriteMode.REORDERED);
/** The number of writes in the batchedWrite. */
private int batchedWriteCount = 0;
/** /**
* Cleanup any state for this DB. Called once per DB instance; there is one * Cleanup any state for this DB. Called once per DB instance; there is one
...@@ -91,9 +107,9 @@ public class AsyncMongoDbClient extends DB { ...@@ -91,9 +107,9 @@ public class AsyncMongoDbClient extends DB {
*/ */
@Override @Override
public final void cleanup() throws DBException { public final void cleanup() throws DBException {
if (initCount.decrementAndGet() <= 0) { if (initCount.decrementAndGet() == 0) {
try { try {
mongo.close(); mongoClient.close();
} }
catch (final Exception e1) { catch (final Exception e1) {
System.err.println("Could not close MongoDB connection pool: " System.err.println("Could not close MongoDB connection pool: "
...@@ -101,6 +117,10 @@ public class AsyncMongoDbClient extends DB { ...@@ -101,6 +117,10 @@ public class AsyncMongoDbClient extends DB {
e1.printStackTrace(); e1.printStackTrace();
return; return;
} }
finally {
mongoClient = null;
database = null;
}
} }
} }
...@@ -117,10 +137,14 @@ public class AsyncMongoDbClient extends DB { ...@@ -117,10 +137,14 @@ public class AsyncMongoDbClient extends DB {
@Override @Override
public final int delete(final String table, final String key) { public final int delete(final String table, final String key) {
try { try {
final MongoCollection collection = db.getCollection(table); final MongoCollection collection = database.getCollection(table);
final Document q = BuilderFactory.start().add("_id", key).build(); final Document q = BuilderFactory.start().add("_id", key).build();
final long res = collection.delete(q, writeConcern); final long res = collection.delete(q, writeConcern);
return res == 1 ? 0 : 1; if (res == 0) {
System.err.println("Nothing deleted for key " + key);
return 1;
}
return 0;
} }
catch (final Exception e) { catch (final Exception e) {
System.err.println(e.toString()); System.err.println(e.toString());
...@@ -137,22 +161,27 @@ public class AsyncMongoDbClient extends DB { ...@@ -137,22 +161,27 @@ public class AsyncMongoDbClient extends DB {
final int count = initCount.incrementAndGet(); final int count = initCount.incrementAndGet();
synchronized (AsyncMongoDbClient.class) { synchronized (AsyncMongoDbClient.class) {
if (mongo != null) { final Properties props = getProperties();
db = mongo.getDatabase(database);
if (mongoClient != null) {
database = mongoClient.getDatabase(databaseName);
// If there are more threads (count) than connections then the // If there are more threads (count) than connections then the
// Low latency spin lock is not really needed as we will keep // Low latency spin lock is not really needed as we will keep
// the connections occupied. // the connections occupied.
if (count > mongo.getConfig().getMaxConnectionCount()) { if (count > mongoClient.getConfig().getMaxConnectionCount()) {
mongo.getConfig().setLockType(LockType.MUTEX); mongoClient.getConfig().setLockType(LockType.MUTEX);
} }
return; return;
} }
// Set insert batchsize, default 1 - to be YCSB-original equivalent
batchSize = Integer.parseInt(props.getProperty("mongodb.batchsize", "1"));
// Just use the standard connection format URL // Just use the standard connection format URL
// http://docs.mongodb.org/manual/reference/connection-string/ // http://docs.mongodatabase.org/manual/reference/connection-string/
final Properties props = getProperties(); // to configure the client.
String url = props.getProperty("mongodb.url", String url = props.getProperty("mongodb.url",
"mongodb://localhost:27017/ycsb?w=1"); "mongodb://localhost:27017/ycsb?w=1");
if (!url.startsWith("mongodb://")) { if (!url.startsWith("mongodb://")) {
...@@ -168,8 +197,8 @@ public class AsyncMongoDbClient extends DB { ...@@ -168,8 +197,8 @@ public class AsyncMongoDbClient extends DB {
MongoDbUri uri = new MongoDbUri(url); MongoDbUri uri = new MongoDbUri(url);
try { try {
database = uri.getDatabase(); databaseName = uri.getDatabase();
if ((database == null) || database.isEmpty()) { if ((databaseName == null) || databaseName.isEmpty()) {
System.err System.err
.println("ERROR: Invalid URL: '" .println("ERROR: Invalid URL: '"
+ url + url
...@@ -178,9 +207,9 @@ public class AsyncMongoDbClient extends DB { ...@@ -178,9 +207,9 @@ public class AsyncMongoDbClient extends DB {
System.exit(1); System.exit(1);
} }
mongo = MongoFactory.createClient(uri); mongoClient = MongoFactory.createClient(uri);
MongoClientConfiguration config = mongo.getConfig(); MongoClientConfiguration config = mongoClient.getConfig();
if (!url.toLowerCase().contains("locktype=")) { if (!url.toLowerCase().contains("locktype=")) {
config.setLockType(LockType.LOW_LATENCY_SPIN); // assumed... config.setLockType(LockType.LOW_LATENCY_SPIN); // assumed...
} }
...@@ -188,7 +217,7 @@ public class AsyncMongoDbClient extends DB { ...@@ -188,7 +217,7 @@ public class AsyncMongoDbClient extends DB {
readPreference = config.getDefaultReadPreference(); readPreference = config.getDefaultReadPreference();
writeConcern = config.getDefaultDurability(); writeConcern = config.getDefaultDurability();
db = mongo.getDatabase(database); database = mongoClient.getDatabase(databaseName);
System.out.println("mongo connection created with " + url); System.out.println("mongo connection created with " + url);
} }
...@@ -213,27 +242,61 @@ public class AsyncMongoDbClient extends DB { ...@@ -213,27 +242,61 @@ public class AsyncMongoDbClient extends DB {
* The record key of the record to insert. * The record key of the record to insert.
* @param values * @param values
* A HashMap of field/value pairs to insert in the record * A HashMap of field/value pairs to insert in the record
* @return Zero on success, a non-zero error code on error. See this class's * @return Zero on success, a non-zero error code on error. See the
* description for a discussion of error codes. * {@link DB} class's description for a discussion of error codes.
*/ */
@Override @Override
public final int insert(final String table, final String key, public final int insert(final String table, final String key,
final HashMap<String, ByteIterator> values) { final HashMap<String, ByteIterator> values) {
try { try {
final MongoCollection collection = db.getCollection(table); final MongoCollection collection = database.getCollection(table);
final DocumentBuilder r = DOCUMENT_BUILDER.get().reset() final DocumentBuilder toInsert = DOCUMENT_BUILDER.get().reset()
.add("_id", key); .add("_id", key);
final Document q = r.build(); final Document query = toInsert.build();
for (final Map.Entry<String, ByteIterator> entry : values for (final Map.Entry<String, ByteIterator> entry : values
.entrySet()) { .entrySet()) {
r.add(entry.getKey(), entry.getValue().toArray()); toInsert.add(entry.getKey(), entry.getValue().toArray());
} }
collection.insert(writeConcern, r);
collection.update(q, r, /* multi= */false, /* upsert= */true, // Do an upsert.
writeConcern); if (batchSize <= 1) {
long result = collection.update(query, toInsert,
/* multi= */false, /* upsert= */true, writeConcern);
return 0; return result == 1 ? 0 : 1;
}
// Use a bulk insert.
try {
batchedWrite.insert(toInsert);
batchedWriteCount += 1;
if (batchedWriteCount < batchSize) {
return 0;
}
long count = collection.write(batchedWrite);
if (count == batchedWriteCount) {
batchedWrite.reset().mode(BatchedWriteMode.REORDERED);
batchedWriteCount = 0;
return 0;
}
System.err
.println("Number of inserted documents doesn't match the number sent, "
+ count
+ " inserted, sent "
+ batchedWriteCount);
batchedWrite.reset().mode(BatchedWriteMode.REORDERED);
batchedWriteCount = 0;
return 1;
}
catch (Exception e) {
System.err.println("Exception while trying bulk insert with "
+ batchedWriteCount);
e.printStackTrace();
return 1;
}
} }
catch (final Exception e) { catch (final Exception e) {
e.printStackTrace(); e.printStackTrace();
...@@ -259,18 +322,19 @@ public class AsyncMongoDbClient extends DB { ...@@ -259,18 +322,19 @@ public class AsyncMongoDbClient extends DB {
public final int read(final String table, final String key, public final int read(final String table, final String key,
final Set<String> fields, final HashMap<String, ByteIterator> result) { final Set<String> fields, final HashMap<String, ByteIterator> result) {
try { try {
final MongoCollection collection = db.getCollection(table); final MongoCollection collection = database.getCollection(table);
final DocumentBuilder q = BuilderFactory.start().add("_id", key); final DocumentBuilder query = DOCUMENT_BUILDER.get().reset()
final DocumentBuilder fieldsToReturn = BuilderFactory.start(); .add("_id", key);
Document queryResult = null; Document queryResult = null;
if (fields != null) { if (fields != null) {
final DocumentBuilder fieldsToReturn = BuilderFactory.start();
final Iterator<String> iter = fields.iterator(); final Iterator<String> iter = fields.iterator();
while (iter.hasNext()) { while (iter.hasNext()) {
fieldsToReturn.add(iter.next(), 1); fieldsToReturn.add(iter.next(), 1);
} }
final Find.Builder fb = new Find.Builder(q); final Find.Builder fb = new Find.Builder(query);
fb.projection(fieldsToReturn); fb.projection(fieldsToReturn);
fb.setLimit(1); fb.setLimit(1);
fb.setBatchSize(1); fb.setBatchSize(1);
...@@ -283,7 +347,7 @@ public class AsyncMongoDbClient extends DB { ...@@ -283,7 +347,7 @@ public class AsyncMongoDbClient extends DB {
} }
} }
else { else {
queryResult = collection.findOne(q); queryResult = collection.findOne(query);
} }
if (queryResult != null) { if (queryResult != null) {
...@@ -313,33 +377,37 @@ public class AsyncMongoDbClient extends DB { ...@@ -313,33 +377,37 @@ public class AsyncMongoDbClient extends DB {
* @param result * @param result
* A Vector of HashMaps, where each HashMap is a set field/value * A Vector of HashMaps, where each HashMap is a set field/value
* pairs for one record * pairs for one record
* @return Zero on success, a non-zero error code on error. See this class's * @return Zero on success, a non-zero error code on error. See the
* description for a discussion of error codes. * {@link DB} class's description for a discussion of error codes.
*/ */
@Override @Override
public final int scan(final String table, final String startkey, public final int scan(final String table, final String startkey,
final int recordcount, final Set<String> fields, final int recordcount, final Set<String> fields,
final Vector<HashMap<String, ByteIterator>> result) { final Vector<HashMap<String, ByteIterator>> result) {
try { try {
final MongoCollection collection = db.getCollection(table); final MongoCollection collection = database.getCollection(table);
// { "_id":{"$gte":startKey}} } final Find.Builder find = Find.builder()
final Find.Builder fb = new Find.Builder(); .query(where("_id").greaterThanOrEqualTo(startkey))
fb.setQuery(where("_id").greaterThanOrEqualTo(startkey)); .limit(recordcount).batchSize(recordcount)
fb.setLimit(recordcount); .sort(Sort.asc("_id")).readPreference(readPreference);
fb.setBatchSize(recordcount);
fb.readPreference(readPreference);
if (fields != null) { if (fields != null) {
final DocumentBuilder fieldsDoc = BuilderFactory.start(); final DocumentBuilder fieldsDoc = BuilderFactory.start();
for (final String field : fields) { for (final String field : fields) {
fieldsDoc.add(field, 1); fieldsDoc.add(field, INCLUDE);
} }
fb.projection(fieldsDoc); find.projection(fieldsDoc);
} }
result.ensureCapacity(recordcount); result.ensureCapacity(recordcount);
final MongoIterator<Document> cursor = collection.find(fb.build());
final MongoIterator<Document> cursor = collection.find(find);
if (!cursor.hasNext()) {
System.err.println("Nothing found in scan for key " + startkey);
return 1;
}
while (cursor.hasNext()) { while (cursor.hasNext()) {
// toMap() returns a Map but result.add() expects a // toMap() returns a Map but result.add() expects a
// Map<String,String>. Hence, the suppress warnings. // Map<String,String>. Hence, the suppress warnings.
...@@ -370,23 +438,25 @@ public class AsyncMongoDbClient extends DB { ...@@ -370,23 +438,25 @@ public class AsyncMongoDbClient extends DB {
* The record key of the record to write. * The record key of the record to write.
* @param values * @param values
* A HashMap of field/value pairs to update in the record * A HashMap of field/value pairs to update in the record
* @return Zero on success, a non-zero error code on error. See this class's * @return Zero on success, a non-zero error code on error. See the
* description for a discussion of error codes. * {@link DB} class's description for a discussion of error codes.
*/ */
@Override @Override
public final int update(final String table, final String key, public final int update(final String table, final String key,
final HashMap<String, ByteIterator> values) { final HashMap<String, ByteIterator> values) {
try { try {
final MongoCollection collection = db.getCollection(table); final MongoCollection collection = database.getCollection(table);
final DocumentBuilder q = BuilderFactory.start().add("_id", key); final DocumentBuilder query = BuilderFactory.start()
final DocumentBuilder u = BuilderFactory.start(); .add("_id", key);
final DocumentBuilder fieldsToSet = u.push("$set"); final DocumentBuilder update = BuilderFactory.start();
final DocumentBuilder fieldsToSet = update.push("$set");
for (final Map.Entry<String, ByteIterator> entry : values for (final Map.Entry<String, ByteIterator> entry : values
.entrySet()) { .entrySet()) {
fieldsToSet.add(entry.getKey(), entry.getValue().toArray()); fieldsToSet.add(entry.getKey(), entry.getValue().toArray());
} }
final long res = collection final long res = collection.update(query, update, false, false,
.update(q, u, false, false, writeConcern); writeConcern);
return res == 1 ? 0 : 1; return res == 1 ? 0 : 1;
} }
catch (final Exception e) { catch (final Exception e) {
......
...@@ -3,14 +3,16 @@ ...@@ -3,14 +3,16 @@
* *
* Submitted by Yen Pai on 5/11/2010. * Submitted by Yen Pai on 5/11/2010.
* *
* https://gist.github.com/000a66b8db2caf42467b#file_mongo_db.java * https://gist.github.com/000a66b8db2caf42467b#file_mongo_database.java
* *
*/ */
package com.yahoo.ycsb.db; package com.yahoo.ycsb.db;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.Set; import java.util.Set;
...@@ -23,11 +25,16 @@ import com.mongodb.MongoClient; ...@@ -23,11 +25,16 @@ import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI; import com.mongodb.MongoClientURI;
import com.mongodb.ReadPreference; import com.mongodb.ReadPreference;
import com.mongodb.WriteConcern; import com.mongodb.WriteConcern;
import com.mongodb.bulk.BulkWriteResult;
import com.mongodb.client.FindIterable; import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor; import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase; import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.BulkWriteOptions;
import com.mongodb.client.model.InsertOneModel;
import com.mongodb.client.model.UpdateOptions; import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.result.DeleteResult;
import com.mongodb.client.result.UpdateResult;
import com.yahoo.ycsb.ByteArrayByteIterator; import com.yahoo.ycsb.ByteArrayByteIterator;
import com.yahoo.ycsb.ByteIterator; import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.DB; import com.yahoo.ycsb.DB;
...@@ -38,18 +45,25 @@ import com.yahoo.ycsb.DBException; ...@@ -38,18 +45,25 @@ import com.yahoo.ycsb.DBException;
* *
* Properties to set: * Properties to set:
* *
* mongodb.url=mongodb://localhost:27017 mongodb.database=ycsb * mongodatabase.url=mongodb://localhost:27017 mongodatabase.database=ycsb
* mongodb.writeConcern=acknowledged * mongodatabase.writeConcern=acknowledged
* *
* @author ypai * @author ypai
*/ */
public class MongoDbClient extends DB { public class MongoDbClient extends DB {
/** Update options to do an upsert. */
private static final UpdateOptions UPSERT = new UpdateOptions()
.upsert(true);
/** Used to include a field in a response. */ /** Used to include a field in a response. */
protected static final Integer INCLUDE = Integer.valueOf(1); protected static final Integer INCLUDE = Integer.valueOf(1);
/** The database to access. */ /** The database name to access. */
private static String database; private static String databaseName;
/** The database name to access. */
private static MongoDatabase database;
/** /**
* Count the number of times initialized to teardown on the last * Count the number of times initialized to teardown on the last
...@@ -58,7 +72,7 @@ public class MongoDbClient extends DB { ...@@ -58,7 +72,7 @@ public class MongoDbClient extends DB {
private static final AtomicInteger initCount = new AtomicInteger(0); private static final AtomicInteger initCount = new AtomicInteger(0);
/** A singleton Mongo instance. */ /** A singleton Mongo instance. */
private static MongoClient mongo; private static MongoClient mongoClient;
/** The default read preference for the test */ /** The default read preference for the test */
private static ReadPreference readPreference; private static ReadPreference readPreference;
...@@ -66,15 +80,21 @@ public class MongoDbClient extends DB { ...@@ -66,15 +80,21 @@ public class MongoDbClient extends DB {
/** The default write concern for the test. */ /** The default write concern for the test. */
private static WriteConcern writeConcern; private static WriteConcern writeConcern;
/** The batch size to use for inserts. */
private static int batchSize;
/** The bulk inserts pending for the thread. */
private final List<InsertOneModel<Document>> bulkInserts = new ArrayList<InsertOneModel<Document>>();
/** /**
* Cleanup any state for this DB. Called once per DB instance; there is one * Cleanup any state for this DB. Called once per DB instance; there is one
* DB instance per client thread. * DB instance per client thread.
*/ */
@Override @Override
public void cleanup() throws DBException { public void cleanup() throws DBException {
if (initCount.decrementAndGet() <= 0) { if (initCount.decrementAndGet() == 0) {
try { try {
mongo.close(); mongoClient.close();
} }
catch (Exception e1) { catch (Exception e1) {
System.err.println("Could not close MongoDB connection pool: " System.err.println("Could not close MongoDB connection pool: "
...@@ -82,6 +102,10 @@ public class MongoDbClient extends DB { ...@@ -82,6 +102,10 @@ public class MongoDbClient extends DB {
e1.printStackTrace(); e1.printStackTrace();
return; return;
} }
finally {
database = null;
mongoClient = null;
}
} }
} }
...@@ -92,19 +116,22 @@ public class MongoDbClient extends DB { ...@@ -92,19 +116,22 @@ public class MongoDbClient extends DB {
* The name of the table * The name of the table
* @param key * @param key
* The record key of the record to delete. * The record key of the record to delete.
* @return Zero on success, a non-zero error code on error. See this class's * @return Zero on success, a non-zero error code on error. See the
* description for a discussion of error codes. * {@link DB} class's description for a discussion of error codes.
*/ */
@Override @Override
public int delete(String table, String key) { public int delete(String table, String key) {
MongoDatabase db = null;
try { try {
db = mongo.getDatabase(database); MongoCollection<Document> collection = database
MongoCollection<Document> collection = db.getCollection(table); .getCollection(table);
Document q = new Document("_id", key); Document query = new Document("_id", key);
collection.withWriteConcern(writeConcern).deleteOne(q); DeleteResult result = collection.withWriteConcern(writeConcern)
.deleteOne(query);
if (result.getDeletedCount() == 0) {
System.err.println("Nothing deleted for key " + key);
return 1;
}
return 0; return 0;
} }
catch (Exception e) { catch (Exception e) {
...@@ -121,16 +148,21 @@ public class MongoDbClient extends DB { ...@@ -121,16 +148,21 @@ public class MongoDbClient extends DB {
public void init() throws DBException { public void init() throws DBException {
initCount.incrementAndGet(); initCount.incrementAndGet();
synchronized (INCLUDE) { synchronized (INCLUDE) {
if (mongo != null) { if (mongoClient != null) {
return; return;
} }
Properties props = getProperties();
// Set insert batchsize, default 1 - to be YCSB-original equivalent
batchSize = Integer.parseInt(props.getProperty("batchsize", "1"));
// Just use the standard connection format URL // Just use the standard connection format URL
// http://docs.mongodb.org/manual/reference/connection-string/ // http://docs.mongodatabase.org/manual/reference/connection-string/
// to configure the client.
// //
// Support legacy options by updating the URL as appropriate. // Support legacy options by updating the URL as appropriate.
Properties props = getProperties(); String url = props.getProperty("mongodatabase.url", null);
String url = props.getProperty("mongodb.url", null);
boolean defaultedUrl = false; boolean defaultedUrl = false;
if (url == null) { if (url == null) {
defaultedUrl = true; defaultedUrl = true;
...@@ -145,7 +177,7 @@ public class MongoDbClient extends DB { ...@@ -145,7 +177,7 @@ public class MongoDbClient extends DB {
+ url + url
+ "'. Must be of the form " + "'. Must be of the form "
+ "'mongodb://<host1>:<port1>,<host2>:<port2>/database?options'. " + "'mongodb://<host1>:<port1>,<host2>:<port2>/database?options'. "
+ "See http://docs.mongodb.org/manual/reference/connection-string/."); + "See http://docs.mongodatabase.org/manual/reference/connection-string/.");
System.exit(1); System.exit(1);
} }
...@@ -155,13 +187,14 @@ public class MongoDbClient extends DB { ...@@ -155,13 +187,14 @@ public class MongoDbClient extends DB {
String uriDb = uri.getDatabase(); String uriDb = uri.getDatabase();
if (!defaultedUrl && (uriDb != null) && !uriDb.isEmpty() if (!defaultedUrl && (uriDb != null) && !uriDb.isEmpty()
&& !"admin".equals(uriDb)) { && !"admin".equals(uriDb)) {
database = uriDb; databaseName = uriDb;
} }
else { else {
database = props.getProperty("mongodb.database", "ycsb"); databaseName = props.getProperty("mongodatabase.database",
"ycsb");
} }
if ((database == null) || database.isEmpty()) { if ((databaseName == null) || databaseName.isEmpty()) {
System.err System.err
.println("ERROR: Invalid URL: '" .println("ERROR: Invalid URL: '"
+ url + url
...@@ -173,9 +206,11 @@ public class MongoDbClient extends DB { ...@@ -173,9 +206,11 @@ public class MongoDbClient extends DB {
readPreference = uri.getOptions().getReadPreference(); readPreference = uri.getOptions().getReadPreference();
writeConcern = uri.getOptions().getWriteConcern(); writeConcern = uri.getOptions().getWriteConcern();
mongo = new MongoClient(uri); mongoClient = new MongoClient(uri);
database = mongoClient.getDatabase(databaseName);
System.out.println("mongo connection created with " + url); System.out.println("mongo client connection created with "
+ url);
} }
catch (Exception e1) { catch (Exception e1) {
System.err System.err
...@@ -198,32 +233,67 @@ public class MongoDbClient extends DB { ...@@ -198,32 +233,67 @@ public class MongoDbClient extends DB {
* The record key of the record to insert. * The record key of the record to insert.
* @param values * @param values
* A HashMap of field/value pairs to insert in the record * A HashMap of field/value pairs to insert in the record
* @return Zero on success, a non-zero error code on error. See this class's * @return Zero on success, a non-zero error code on error. See the
* description for a discussion of error codes. * {@link DB} class's description for a discussion of error codes.
*/ */
@Override @Override
public int insert(String table, String key, public int insert(String table, String key,
HashMap<String, ByteIterator> values) { HashMap<String, ByteIterator> values) {
MongoDatabase db = null;
try { try {
db = mongo.getDatabase(database); MongoCollection<Document> collection = database
.getCollection(table);
MongoCollection<Document> collection = db.getCollection(table);
Document criteria = new Document("_id", key); Document criteria = new Document("_id", key);
Document toInsert = new Document("_id", key); Document toInsert = new Document("_id", key);
for (Map.Entry<String, ByteIterator> entry : values.entrySet()) { for (Map.Entry<String, ByteIterator> entry : values.entrySet()) {
toInsert.put(entry.getKey(), entry.getValue().toArray()); toInsert.put(entry.getKey(), entry.getValue().toArray());
} }
collection.withWriteConcern(writeConcern).updateOne(criteria, // Do a single upsert.
toInsert, new UpdateOptions().upsert(true)); if (batchSize <= 1) {
UpdateResult result = collection.withWriteConcern(writeConcern)
.updateOne(criteria, toInsert, UPSERT);
if (result.getMatchedCount() > 0
|| result.getModifiedCount() > 0) {
return 0;
}
System.err.println("Nothing inserted for key " + key);
return 1;
}
return 0; // Use a bulk insert.
try {
bulkInserts.add(new InsertOneModel<Document>(toInsert));
if (bulkInserts.size() < batchSize) {
return 0;
}
BulkWriteResult result = collection.withWriteConcern(
writeConcern).bulkWrite(bulkInserts,
new BulkWriteOptions().ordered(false));
if (result.getInsertedCount() == bulkInserts.size()) {
bulkInserts.clear();
return 0;
}
System.err
.println("Number of inserted documents doesn't match the number sent, "
+ result.getInsertedCount()
+ " inserted, sent " + bulkInserts.size());
bulkInserts.clear();
return 1;
}
catch (Exception e) {
System.err.println("Exception while trying bulk insert with "
+ bulkInserts.size());
e.printStackTrace();
return 1;
}
} }
catch (Exception e) { catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
return 1; return 1;
} }
} }
/** /**
...@@ -243,12 +313,10 @@ public class MongoDbClient extends DB { ...@@ -243,12 +313,10 @@ public class MongoDbClient extends DB {
@Override @Override
public int read(String table, String key, Set<String> fields, public int read(String table, String key, Set<String> fields,
HashMap<String, ByteIterator> result) { HashMap<String, ByteIterator> result) {
MongoDatabase db = null;
try { try {
db = mongo.getDatabase(database); MongoCollection<Document> collection = database
.getCollection(table);
MongoCollection<Document> collection = db.getCollection(table); Document query = new Document("_id", key);
Document q = new Document("_id", key);
Document fieldsToReturn = new Document(); Document fieldsToReturn = new Document();
Document queryResult = null; Document queryResult = null;
...@@ -258,11 +326,11 @@ public class MongoDbClient extends DB { ...@@ -258,11 +326,11 @@ public class MongoDbClient extends DB {
fieldsToReturn.put(iter.next(), INCLUDE); fieldsToReturn.put(iter.next(), INCLUDE);
} }
queryResult = collection.withReadPreference(readPreference) queryResult = collection.withReadPreference(readPreference)
.find(q).projection(fieldsToReturn).first(); .find(query).projection(fieldsToReturn).first();
} }
else { else {
queryResult = collection.withReadPreference(readPreference) queryResult = collection.withReadPreference(readPreference)
.find(q).first(); .find(query).first();
} }
if (queryResult != null) { if (queryResult != null) {
...@@ -291,30 +359,39 @@ public class MongoDbClient extends DB { ...@@ -291,30 +359,39 @@ public class MongoDbClient extends DB {
* @param result * @param result
* A Vector of HashMaps, where each HashMap is a set field/value * A Vector of HashMaps, where each HashMap is a set field/value
* pairs for one record * pairs for one record
* @return Zero on success, a non-zero error code on error. See this class's * @return Zero on success, a non-zero error code on error. See the
* description for a discussion of error codes. * {@link DB} class's description for a discussion of error codes.
*/ */
@Override @Override
public int scan(String table, String startkey, int recordcount, public int scan(String table, String startkey, int recordcount,
Set<String> fields, Vector<HashMap<String, ByteIterator>> result) { Set<String> fields, Vector<HashMap<String, ByteIterator>> result) {
MongoDatabase db = null;
FindIterable<Document> cursor = null; FindIterable<Document> cursor = null;
MongoCursor<Document> iter = null; MongoCursor<Document> iter = null;
try { try {
db = mongo.getDatabase(database); MongoCollection<Document> collection = database
.getCollection(table);
MongoCollection<Document> collection = db.getCollection(table);
// { "_id":{"$gte":startKey, "$lte":{"appId":key+"\uFFFF"}} }
Document scanRange = new Document("$gte", startkey); Document scanRange = new Document("$gte", startkey);
Document q = new Document("_id", scanRange); Document query = new Document("_id", scanRange);
cursor = collection.withReadPreference(readPreference).find(q) Document sort = new Document("_id", INCLUDE);
.limit(recordcount); Document projection = null;
if (fields != null) {
projection = new Document();
for (String fieldName : fields) {
projection.put(fieldName, INCLUDE);
}
}
cursor = collection.withReadPreference(readPreference).find(query)
.projection(projection).sort(sort).limit(recordcount);
// Do the query.
iter = cursor.iterator(); iter = cursor.iterator();
if (!iter.hasNext()) {
System.err.println("Nothing found in scan for key " + startkey);
return 1;
}
while (iter.hasNext()) { while (iter.hasNext()) {
// toMap() returns a Map, but result.add() expects a
// Map<String,String>. Hence, the suppress warnings.
HashMap<String, ByteIterator> resultMap = new HashMap<String, ByteIterator>(); HashMap<String, ByteIterator> resultMap = new HashMap<String, ByteIterator>();
Document obj = iter.next(); Document obj = iter.next();
...@@ -353,22 +430,23 @@ public class MongoDbClient extends DB { ...@@ -353,22 +430,23 @@ public class MongoDbClient extends DB {
@Override @Override
public int update(String table, String key, public int update(String table, String key,
HashMap<String, ByteIterator> values) { HashMap<String, ByteIterator> values) {
MongoDatabase db = null;
try { try {
db = mongo.getDatabase(database); MongoCollection<Document> collection = database
.getCollection(table);
MongoCollection<Document> collection = db.getCollection(table);
Document q = new Document("_id", key);
Document query = new Document("_id", key);
Document fieldsToSet = new Document(); Document fieldsToSet = new Document();
Iterator<String> keys = values.keySet().iterator(); for (Map.Entry<String, ByteIterator> entry : values.entrySet()) {
while (keys.hasNext()) { fieldsToSet.put(entry.getKey(), entry.getValue().toArray());
String tmpKey = keys.next();
fieldsToSet.put(tmpKey, values.get(tmpKey).toArray());
} }
Document u = new Document("$set", fieldsToSet); Document update = new Document("$set", fieldsToSet);
collection.withWriteConcern(writeConcern).updateOne(q, u); UpdateResult result = collection.withWriteConcern(writeConcern)
.updateOne(query, update);
if (result.getMatchedCount() == 0) {
System.err.println("Nothing updated for key " + key);
return 1;
}
return 0; return 0;
} }
catch (Exception e) { catch (Exception e) {
......
...@@ -81,9 +81,14 @@ public final class OptionsSupport { ...@@ -81,9 +81,14 @@ public final class OptionsSupport {
else if ("replica_acknowledged".equals(writeConcernType)) { else if ("replica_acknowledged".equals(writeConcernType)) {
result = addUrlOption(result, "w", "2"); result = addUrlOption(result, "w", "2");
} }
else if ("majority".equals(writeConcernType)) {
result = addUrlOption(result, "w", "majority");
}
else { else {
System.err.println("WARNING: Invalid writeConcern: '" System.err.println("WARNING: Invalid writeConcern: '"
+ writeConcernType + "' will be ignored."); + writeConcernType + "' will be ignored. "
+ "Must be one of [ unacknowledged | acknowledged | "
+ "journaled | replica_acknowledged | majority ]");
} }
} }
...@@ -110,7 +115,9 @@ public final class OptionsSupport { ...@@ -110,7 +115,9 @@ public final class OptionsSupport {
} }
else { else {
System.err.println("WARNING: Invalid readPreference: '" System.err.println("WARNING: Invalid readPreference: '"
+ readPreferenceType + "' will be ignored."); + readPreferenceType + "' will be ignored. "
+ "Must be one of [ primary | primary_preferred | "
+ "secondary | secondary_preferred | nearest ]");
} }
} }
......
...@@ -115,6 +115,10 @@ public class OptionsSupportTest { ...@@ -115,6 +115,10 @@ public class OptionsSupportTest {
updateUrl("mongodb://locahost:27017/?foo=bar", updateUrl("mongodb://locahost:27017/?foo=bar",
props("mongodb.writeConcern", "replica_acknowledged")), props("mongodb.writeConcern", "replica_acknowledged")),
is("mongodb://locahost:27017/?foo=bar&w=2")); is("mongodb://locahost:27017/?foo=bar&w=2"));
assertThat(
updateUrl("mongodb://locahost:27017/?foo=bar",
props("mongodb.writeConcern", "majority")),
is("mongodb://locahost:27017/?foo=bar&w=majority"));
// w already exists. // w already exists.
assertThat( assertThat(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment