Skip to content
Snippets Groups Projects
Commit 2c83327f authored by Robert J. Moore's avatar Robert J. Moore
Browse files

Merge branch 'jyemin-MongoDbClientCleanup'

parents 6a0d00d7 e28ec24e
No related branches found
No related tags found
No related merge requests found
......@@ -11,7 +11,6 @@ package com.yahoo.ycsb.db;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
......@@ -19,6 +18,8 @@ import java.util.Set;
import java.util.Vector;
import java.util.concurrent.atomic.AtomicInteger;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.InsertManyOptions;
import org.bson.Document;
import org.bson.types.Binary;
......@@ -26,13 +27,10 @@ import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.ReadPreference;
import com.mongodb.WriteConcern;
import com.mongodb.bulk.BulkWriteResult;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.BulkWriteOptions;
import com.mongodb.client.model.InsertOneModel;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.result.DeleteResult;
import com.mongodb.client.result.UpdateResult;
......@@ -53,14 +51,20 @@ import com.yahoo.ycsb.DBException;
*/
public class MongoDbClient extends DB {
/** Update options to do an upsert. */
private static final UpdateOptions UPSERT = new UpdateOptions()
.upsert(true);
/** Used to include a field in a response. */
protected static final Integer INCLUDE = Integer.valueOf(1);
private static final Integer INCLUDE = Integer.valueOf(1);
/** The database name to access. */
/** The options to use for inserting many documents */
private static final InsertManyOptions INSERT_UNORDERED =
new InsertManyOptions().ordered(false);
/** The options to use for inserting a single document */
private static final UpdateOptions UPDATE_WITH_UPSERT =
new UpdateOptions().upsert(true);
/**
* The database name to access.
*/
private static String databaseName;
/** The database name to access. */
......@@ -85,7 +89,7 @@ public class MongoDbClient extends DB {
private static int batchSize;
/** The bulk inserts pending for the thread. */
private final List<InsertOneModel<Document>> bulkInserts = new ArrayList<InsertOneModel<Document>>();
private final List<Document> bulkInserts = new ArrayList<Document>();
/**
* Cleanup any state for this DB. Called once per DB instance; there is one
......@@ -199,7 +203,9 @@ public class MongoDbClient extends DB {
writeConcern = uri.getOptions().getWriteConcern();
mongoClient = new MongoClient(uri);
database = mongoClient.getDatabase(databaseName);
database = mongoClient.getDatabase(databaseName)
.withReadPreference(readPreference)
.withWriteConcern(writeConcern);
System.out.println("mongo client connection created with "
+ url);
......@@ -234,59 +240,31 @@ public class MongoDbClient extends DB {
try {
MongoCollection<Document> collection = database
.getCollection(table);
Document criteria = new Document("_id", key);
Document toInsert = new Document("_id", key);
for (Map.Entry<String, ByteIterator> entry : values.entrySet()) {
toInsert.put(entry.getKey(), entry.getValue().toArray());
}
// Do a single upsert.
if (batchSize <= 1) {
UpdateResult result = collection.withWriteConcern(writeConcern)
.replaceOne(criteria, toInsert, UPSERT);
if (!result.wasAcknowledged()
|| result.getMatchedCount() > 0
|| (result.isModifiedCountAvailable() && (result
.getModifiedCount() > 0))
|| result.getUpsertedId() != null) {
return 0;
}
System.err.println("Nothing inserted for key " + key);
return 1;
if (batchSize == 1) {
// this is effectively an insert, but using an upsert instead due
// to current inability of the framework to clean up after itself
// between test runs.
collection.replaceOne(new Document("_id", toInsert.get("_id")),
toInsert,
UPDATE_WITH_UPSERT);
}
// Use a bulk insert.
try {
bulkInserts.add(new InsertOneModel<Document>(toInsert));
if (bulkInserts.size() < batchSize) {
return 0;
}
BulkWriteResult result = collection.withWriteConcern(
writeConcern).bulkWrite(bulkInserts,
new BulkWriteOptions().ordered(false));
if (!result.wasAcknowledged()
|| result.getInsertedCount() == bulkInserts.size()) {
else {
bulkInserts.add(toInsert);
if (bulkInserts.size() == batchSize) {
collection.insertMany(bulkInserts, INSERT_UNORDERED);
bulkInserts.clear();
return 0;
}
System.err
.println("Number of inserted documents doesn't match the number sent, "
+ result.getInsertedCount()
+ " inserted, sent " + bulkInserts.size());
bulkInserts.clear();
return 1;
}
catch (Exception e) {
System.err.println("Exception while trying bulk insert with "
+ bulkInserts.size());
e.printStackTrace();
return 1;
}
return 0;
}
catch (Exception e) {
System.err.println("Exception while trying bulk insert with "
+ bulkInserts.size());
e.printStackTrace();
return 1;
}
......@@ -314,22 +292,19 @@ public class MongoDbClient extends DB {
MongoCollection<Document> collection = database
.getCollection(table);
Document query = new Document("_id", key);
Document fieldsToReturn = new Document();
Document queryResult = null;
FindIterable<Document> findIterable = collection.find(query);
if (fields != null) {
Iterator<String> iter = fields.iterator();
while (iter.hasNext()) {
fieldsToReturn.put(iter.next(), INCLUDE);
Document projection = new Document();
for (String field : fields) {
projection.put(field, INCLUDE);
}
queryResult = collection.withReadPreference(readPreference)
.find(query).projection(fieldsToReturn).first();
}
else {
queryResult = collection.withReadPreference(readPreference)
.find(query).first();
findIterable.projection(projection);
}
Document queryResult = findIterable.first();
if (queryResult != null) {
fillMap(result, queryResult);
}
......@@ -362,8 +337,7 @@ public class MongoDbClient extends DB {
@Override
public int scan(String table, String startkey, int recordcount,
Set<String> fields, Vector<HashMap<String, ByteIterator>> result) {
FindIterable<Document> cursor = null;
MongoCursor<Document> iter = null;
MongoCursor<Document> cursor = null;
try {
MongoCollection<Document> collection = database
.getCollection(table);
......@@ -371,27 +345,32 @@ public class MongoDbClient extends DB {
Document scanRange = new Document("$gte", startkey);
Document query = new Document("_id", scanRange);
Document sort = new Document("_id", INCLUDE);
Document projection = null;
FindIterable<Document> findIterable = collection.find(query)
.sort(sort)
.limit(recordcount);
if (fields != null) {
projection = new Document();
Document projection = new Document();
for (String fieldName : fields) {
projection.put(fieldName, INCLUDE);
}
findIterable.projection(projection);
}
cursor = collection.withReadPreference(readPreference).find(query)
.projection(projection).sort(sort).limit(recordcount);
cursor = findIterable.iterator();
// Do the query.
iter = cursor.iterator();
if (!iter.hasNext()) {
if (!cursor.hasNext()) {
System.err.println("Nothing found in scan for key " + startkey);
return 1;
}
while (iter.hasNext()) {
result.ensureCapacity(recordcount);
while (cursor.hasNext()) {
HashMap<String, ByteIterator> resultMap = new HashMap<String, ByteIterator>();
Document obj = iter.next();
Document obj = cursor.next();
fillMap(resultMap, obj);
result.add(resultMap);
......@@ -404,8 +383,8 @@ public class MongoDbClient extends DB {
return 1;
}
finally {
if (iter != null) {
iter.close();
if (cursor != null) {
cursor.close();
}
}
}
......@@ -438,8 +417,7 @@ public class MongoDbClient extends DB {
}
Document update = new Document("$set", fieldsToSet);
UpdateResult result = collection.withWriteConcern(writeConcern)
.updateOne(query, update);
UpdateResult result = collection.updateOne(query, update);
if (result.wasAcknowledged() && result.getMatchedCount() == 0) {
System.err.println("Nothing updated for key " + key);
return 1;
......
......@@ -30,7 +30,7 @@ import java.util.Properties;
public final class OptionsSupport {
/** Value for an unavailable property. */
protected static final String UNAVAILABLE = "n/a";
private static final String UNAVAILABLE = "n/a";
/**
* Updates the URL with the appropriate attributes if legacy properties are
......@@ -76,7 +76,8 @@ public final class OptionsSupport {
result = addUrlOption(result, "w", "1");
}
else if ("journaled".equals(writeConcernType)) {
result = addUrlOption(result, "journal", "true");
result = addUrlOption(result, "journal", "true"); // this is the documented option name
result = addUrlOption(result, "j", "true"); // but keep this until MongoDB Java driver supports "journal" option
}
else if ("replica_acknowledged".equals(writeConcernType)) {
result = addUrlOption(result, "w", "2");
......
......@@ -110,7 +110,7 @@ public class OptionsSupportTest {
assertThat(
updateUrl("mongodb://locahost:27017/?foo=bar",
props("mongodb.writeConcern", "journaled")),
is("mongodb://locahost:27017/?foo=bar&journal=true"));
is("mongodb://locahost:27017/?foo=bar&journal=true&j=true"));
assertThat(
updateUrl("mongodb://locahost:27017/?foo=bar",
props("mongodb.writeConcern", "replica_acknowledged")),
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment