Skip to content
Snippets Groups Projects
Commit bac8b695 authored by Jeff Yemin's avatar Jeff Yemin Committed by Robert J. Moore
Browse files

Cleaned up CRUD code for MongoDbClient to improve readability.

Simplified insert CRUD code to always use insertMany even when batchSize == 1,
 as there is no performance penalty for this in the driver.
parent 170b660b
No related branches found
No related tags found
No related merge requests found
...@@ -19,6 +19,7 @@ import java.util.Set; ...@@ -19,6 +19,7 @@ import java.util.Set;
import java.util.Vector; import java.util.Vector;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import com.mongodb.client.model.InsertManyOptions;
import org.bson.Document; import org.bson.Document;
import org.bson.types.Binary; import org.bson.types.Binary;
...@@ -53,14 +54,16 @@ import com.yahoo.ycsb.DBException; ...@@ -53,14 +54,16 @@ import com.yahoo.ycsb.DBException;
*/ */
public class MongoDbClient extends DB { public class MongoDbClient extends DB {
/** Update options to do an upsert. */
private static final UpdateOptions UPSERT = new UpdateOptions()
.upsert(true);
/** Used to include a field in a response. */ /** Used to include a field in a response. */
protected static final Integer INCLUDE = Integer.valueOf(1); protected static final Integer INCLUDE = Integer.valueOf(1);
/** The database name to access. */ /** The options to use for inserting many documents */
protected static final InsertManyOptions INSERT_MANY_OPTIONS =
new InsertManyOptions().ordered(false);
/**
* The database name to access.
*/
private static String databaseName; private static String databaseName;
/** The database name to access. */ /** The database name to access. */
...@@ -85,7 +88,7 @@ public class MongoDbClient extends DB { ...@@ -85,7 +88,7 @@ public class MongoDbClient extends DB {
private static int batchSize; private static int batchSize;
/** The bulk inserts pending for the thread. */ /** The bulk inserts pending for the thread. */
private final List<InsertOneModel<Document>> bulkInserts = new ArrayList<InsertOneModel<Document>>(); private final List<Document> bulkInserts = new ArrayList<Document>();
/** /**
* Cleanup any state for this DB. Called once per DB instance; there is one * Cleanup any state for this DB. Called once per DB instance; there is one
...@@ -234,59 +237,22 @@ public class MongoDbClient extends DB { ...@@ -234,59 +237,22 @@ public class MongoDbClient extends DB {
try { try {
MongoCollection<Document> collection = database MongoCollection<Document> collection = database
.getCollection(table); .getCollection(table);
Document criteria = new Document("_id", key);
Document toInsert = new Document("_id", key); Document toInsert = new Document("_id", key);
for (Map.Entry<String, ByteIterator> entry : values.entrySet()) { for (Map.Entry<String, ByteIterator> entry : values.entrySet()) {
toInsert.put(entry.getKey(), entry.getValue().toArray()); toInsert.put(entry.getKey(), entry.getValue().toArray());
} }
// Do a single upsert. bulkInserts.add(toInsert);
if (batchSize <= 1) { if (bulkInserts.size() == batchSize) {
UpdateResult result = collection.withWriteConcern(writeConcern) collection.withWriteConcern(writeConcern)
.replaceOne(criteria, toInsert, UPSERT); .insertMany(bulkInserts, INSERT_MANY_OPTIONS);
if (!result.wasAcknowledged()
|| result.getMatchedCount() > 0
|| (result.isModifiedCountAvailable() && (result
.getModifiedCount() > 0))
|| result.getUpsertedId() != null) {
return 0;
}
System.err.println("Nothing inserted for key " + key);
return 1;
}
// Use a bulk insert.
try {
bulkInserts.add(new InsertOneModel<Document>(toInsert));
if (bulkInserts.size() < batchSize) {
return 0;
}
BulkWriteResult result = collection.withWriteConcern(
writeConcern).bulkWrite(bulkInserts,
new BulkWriteOptions().ordered(false));
if (!result.wasAcknowledged()
|| result.getInsertedCount() == bulkInserts.size()) {
bulkInserts.clear();
return 0;
}
System.err
.println("Number of inserted documents doesn't match the number sent, "
+ result.getInsertedCount()
+ " inserted, sent " + bulkInserts.size());
bulkInserts.clear(); bulkInserts.clear();
return 1;
}
catch (Exception e) {
System.err.println("Exception while trying bulk insert with "
+ bulkInserts.size());
e.printStackTrace();
return 1;
} }
return 0;
} }
catch (Exception e) { catch (Exception e) {
System.err.println("Exception while trying bulk insert with "
+ bulkInserts.size());
e.printStackTrace(); e.printStackTrace();
return 1; return 1;
} }
...@@ -314,22 +280,22 @@ public class MongoDbClient extends DB { ...@@ -314,22 +280,22 @@ public class MongoDbClient extends DB {
MongoCollection<Document> collection = database MongoCollection<Document> collection = database
.getCollection(table); .getCollection(table);
Document query = new Document("_id", key); Document query = new Document("_id", key);
Document fieldsToReturn = new Document();
FindIterable<Document> findIterable = collection
.withReadPreference(readPreference)
.find(query);
Document queryResult = null; Document queryResult = null;
if (fields != null) { if (fields != null) {
Iterator<String> iter = fields.iterator(); Document projection = new Document();
while (iter.hasNext()) { for (String field : fields) {
fieldsToReturn.put(iter.next(), INCLUDE); projection.put(field, INCLUDE);
} }
queryResult = collection.withReadPreference(readPreference) findIterable.projection(projection);
.find(query).projection(fieldsToReturn).first();
}
else {
queryResult = collection.withReadPreference(readPreference)
.find(query).first();
} }
queryResult = findIterable.first();
if (queryResult != null) { if (queryResult != null) {
fillMap(result, queryResult); fillMap(result, queryResult);
} }
...@@ -362,8 +328,7 @@ public class MongoDbClient extends DB { ...@@ -362,8 +328,7 @@ public class MongoDbClient extends DB {
@Override @Override
public int scan(String table, String startkey, int recordcount, public int scan(String table, String startkey, int recordcount,
Set<String> fields, Vector<HashMap<String, ByteIterator>> result) { Set<String> fields, Vector<HashMap<String, ByteIterator>> result) {
FindIterable<Document> cursor = null; MongoCursor<Document> cursor = null;
MongoCursor<Document> iter = null;
try { try {
MongoCollection<Document> collection = database MongoCollection<Document> collection = database
.getCollection(table); .getCollection(table);
...@@ -380,18 +345,15 @@ public class MongoDbClient extends DB { ...@@ -380,18 +345,15 @@ public class MongoDbClient extends DB {
} }
cursor = collection.withReadPreference(readPreference).find(query) cursor = collection.withReadPreference(readPreference).find(query)
.projection(projection).sort(sort).limit(recordcount); .projection(projection).sort(sort).limit(recordcount).iterator();
if (!cursor.hasNext()) {
// Do the query.
iter = cursor.iterator();
if (!iter.hasNext()) {
System.err.println("Nothing found in scan for key " + startkey); System.err.println("Nothing found in scan for key " + startkey);
return 1; return 1;
} }
while (iter.hasNext()) { while (cursor.hasNext()) {
HashMap<String, ByteIterator> resultMap = new HashMap<String, ByteIterator>(); HashMap<String, ByteIterator> resultMap = new HashMap<String, ByteIterator>();
Document obj = iter.next(); Document obj = cursor.next();
fillMap(resultMap, obj); fillMap(resultMap, obj);
result.add(resultMap); result.add(resultMap);
...@@ -404,8 +366,8 @@ public class MongoDbClient extends DB { ...@@ -404,8 +366,8 @@ public class MongoDbClient extends DB {
return 1; return 1;
} }
finally { finally {
if (iter != null) { if (cursor != null) {
iter.close(); cursor.close();
} }
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment