diff --git a/mongodb/README.md b/mongodb/README.md index 16717ae2b39c8f21acffc8328d5f5084df582831..fcb8869083c999d3bdcee35ca20c1fd64e66d140 100644 --- a/mongodb/README.md +++ b/mongodb/README.md @@ -108,8 +108,14 @@ See the next section for the list of configuration parameters for MongoDB. - Useful for the insert workload as it will submit the inserts in batches inproving throughput. - Default value is `1`. +- `mongodb.upsert` + - Determines if the insert operation performs an update with the upsert operation or a insert. + Upserts have the advantage that they will continue to work for a partially loaded data set. + - Setting to `true` uses updates, `false` uses insert operations. + - Default value is `false`. + - `mongodb.writeConcern` - - **Deprecated** - Use the `w` and `journal` options on the MongoDB URI provided by the `mongodb.uri`. + - **Deprecated** - Use the `w` and `journal` options on the MongoDB URI provided by the `mongodb.url`. - Allowed values are : - `errors_ignored` - `unacknowledged` @@ -120,7 +126,7 @@ See the next section for the list of configuration parameters for MongoDB. - Default value is `acknowledged`. - `mongodb.readPreference` - - **Deprecated** - Use the `readPreference` options on the MongoDB URI provided by the `mongodb.uri`. + - **Deprecated** - Use the `readPreference` options on the MongoDB URI provided by the `mongodb.url`. - Allowed values are : - `primary` - `primary_preferred` @@ -130,11 +136,11 @@ See the next section for the list of configuration parameters for MongoDB. - Default value is `primary`. - `mongodb.maxconnections` - - **Deprecated** - Use the `maxPoolSize` options on the MongoDB URI provided by the `mongodb.uri`. + - **Deprecated** - Use the `maxPoolSize` options on the MongoDB URI provided by the `mongodb.url`. - Default value is `100`. - `mongodb.threadsAllowedToBlockForConnectionMultiplier` - - **Deprecated** - Use the `waitQueueMultiple` options on the MongoDB URI provided by the `mongodb.uri`. + - **Deprecated** - Use the `waitQueueMultiple` options on the MongoDB URI provided by the `mongodb.url`. - Default value is `5`. For example: diff --git a/mongodb/src/main/java/com/yahoo/ycsb/db/AsyncMongoDbClient.java b/mongodb/src/main/java/com/yahoo/ycsb/db/AsyncMongoDbClient.java index 7db9b0b55e8900c9a00760d74248078863e3dd23..fb2721b981bea5e168db89b902e96f8845a59f26 100644 --- a/mongodb/src/main/java/com/yahoo/ycsb/db/AsyncMongoDbClient.java +++ b/mongodb/src/main/java/com/yahoo/ycsb/db/AsyncMongoDbClient.java @@ -96,6 +96,9 @@ public class AsyncMongoDbClient extends DB { /** The batch size to use for inserts. */ private static int batchSize; + + /** If true then use updates with the upsert option for inserts. */ + private static boolean useUpsert; /** The bulk inserts pending for the thread. */ private final BatchedWrite.Builder batchedWrite = BatchedWrite.builder() @@ -178,7 +181,11 @@ public class AsyncMongoDbClient extends DB { // Set insert batchsize, default 1 - to be YCSB-original equivalent batchSize = Integer.parseInt(props.getProperty("mongodb.batchsize", "1")); - + + // Set is inserts are done as upserts. Defaults to false. + useUpsert = Boolean.parseBoolean( + props.getProperty("mongodb.upsert", "false")); + // Just use the standard connection format URL // http://docs.mongodb.org/manual/reference/connection-string/ // to configure the client. @@ -255,15 +262,26 @@ public class AsyncMongoDbClient extends DB { // Do an upsert. if (batchSize <= 1) { - long result = collection.update(query, toInsert, - /* multi= */false, /* upsert= */true, writeConcern); - + long result; + if (useUpsert) { + result = collection.update(query, toInsert, + /* multi= */false, /* upsert= */true, writeConcern); + } else { + // Return is not stable pre-SERVER-4381. No exception is success. + collection.insert(writeConcern, toInsert); + result = 1; + } return result == 1 ? 0 : 1; } // Use a bulk insert. try { - batchedWrite.insert(toInsert); + if (useUpsert) { + batchedWrite.update(query, toInsert, /* multi= */false, + /* upsert= */true); + } else { + batchedWrite.insert(toInsert); + } batchedWriteCount += 1; if (batchedWriteCount < batchSize) { diff --git a/mongodb/src/main/java/com/yahoo/ycsb/db/MongoDbClient.java b/mongodb/src/main/java/com/yahoo/ycsb/db/MongoDbClient.java index a124ef0967d333f7bd840b12b7d9d1d18de910d4..6a7650b6c5a2c4e30974a9fdc5522fab4dc15e7c 100644 --- a/mongodb/src/main/java/com/yahoo/ycsb/db/MongoDbClient.java +++ b/mongodb/src/main/java/com/yahoo/ycsb/db/MongoDbClient.java @@ -34,6 +34,8 @@ import java.util.Vector; import java.util.concurrent.atomic.AtomicInteger; import com.mongodb.client.model.InsertManyOptions; +import com.mongodb.client.model.UpdateOneModel; + import org.bson.Document; import org.bson.types.Binary; @@ -103,6 +105,9 @@ public class MongoDbClient extends DB { /** The batch size to use for inserts. */ private static int batchSize; + /** If true then use updates with the upsert option for inserts. */ + private static boolean useUpsert; + /** The bulk inserts pending for the thread. */ private final List<Document> bulkInserts = new ArrayList<Document>(); @@ -173,6 +178,10 @@ public class MongoDbClient extends DB { // Set insert batchsize, default 1 - to be YCSB-original equivalent batchSize = Integer.parseInt(props.getProperty("batchsize", "1")); + // Set is inserts are done as upserts. Defaults to false. + useUpsert = Boolean.parseBoolean( + props.getProperty("mongodb.upsert", "false")); + // Just use the standard connection format URL // http://docs.mongodb.org/manual/reference/connection-string/ // to configure the client. @@ -251,15 +260,30 @@ public class MongoDbClient extends DB { } if (batchSize == 1) { - // this is effectively an insert, but using an upsert instead due - // to current inability of the framework to clean up after itself - // between test runs. - collection.replaceOne(new Document("_id", toInsert.get("_id")), - toInsert, UPDATE_WITH_UPSERT); + if (useUpsert) { + // this is effectively an insert, but using an upsert instead due + // to current inability of the framework to clean up after itself + // between test runs. + collection.replaceOne(new Document("_id", toInsert.get("_id")), + toInsert, UPDATE_WITH_UPSERT); + } else { + collection.insertOne(toInsert); + } } else { bulkInserts.add(toInsert); if (bulkInserts.size() == batchSize) { - collection.insertMany(bulkInserts, INSERT_UNORDERED); + if (useUpsert) { + List<UpdateOneModel<Document>> updates = + new ArrayList<UpdateOneModel<Document>>(bulkInserts.size()); + for (Document doc : bulkInserts) { + updates.add(new UpdateOneModel<Document>( + new Document("_id", doc.get("_id")), + doc, UPDATE_WITH_UPSERT)); + } + collection.bulkWrite(updates); + } else { + collection.insertMany(bulkInserts, INSERT_UNORDERED); + } bulkInserts.clear(); } } diff --git a/mongodb/src/test/java/com/yahoo/ycsb/db/AbstractDBTestCases.java b/mongodb/src/test/java/com/yahoo/ycsb/db/AbstractDBTestCases.java index a2ced3f543b0d91797e63dfffc7a1989b689a65e..1f0d2e85f372886c2558d1dd0ad18afc62acd204 100644 --- a/mongodb/src/test/java/com/yahoo/ycsb/db/AbstractDBTestCases.java +++ b/mongodb/src/test/java/com/yahoo/ycsb/db/AbstractDBTestCases.java @@ -29,6 +29,7 @@ import java.net.InetAddress; import java.net.Socket; import java.util.Collections; import java.util.HashMap; +import java.util.Properties; import java.util.Set; import java.util.Vector; @@ -189,6 +190,69 @@ public abstract class AbstractDBTestCases { } } + /** + * Test method for {@link DB#insert}, {@link DB#read}, and {@link DB#update} . + */ + @Test + public void testInsertReadUpdateWithUpsert() { + Properties props = new Properties(); + props.setProperty("mongodb.upsert", "true"); + DB client = getDB(props); + + final String table = getClass().getSimpleName(); + final String id = "updateWithUpsert"; + + HashMap<String, ByteIterator> inserted = + new HashMap<String, ByteIterator>(); + inserted.put("a", new ByteArrayByteIterator(new byte[] { 1, 2, 3, 4 })); + int result = client.insert(table, id, inserted); + assertThat("Insert did not return success (0).", result, is(0)); + + HashMap<String, ByteIterator> read = new HashMap<String, ByteIterator>(); + Set<String> keys = Collections.singleton("a"); + result = client.read(table, id, keys, read); + assertThat("Read did not return success (0).", result, is(0)); + for (String key : keys) { + ByteIterator iter = read.get(key); + + assertThat("Did not read the inserted field: " + key, iter, + notNullValue()); + assertTrue(iter.hasNext()); + assertThat(iter.nextByte(), is(Byte.valueOf((byte) 1))); + assertTrue(iter.hasNext()); + assertThat(iter.nextByte(), is(Byte.valueOf((byte) 2))); + assertTrue(iter.hasNext()); + assertThat(iter.nextByte(), is(Byte.valueOf((byte) 3))); + assertTrue(iter.hasNext()); + assertThat(iter.nextByte(), is(Byte.valueOf((byte) 4))); + assertFalse(iter.hasNext()); + } + + HashMap<String, ByteIterator> updated = new HashMap<String, ByteIterator>(); + updated.put("a", new ByteArrayByteIterator(new byte[] { 5, 6, 7, 8 })); + result = client.update(table, id, updated); + assertThat("Update did not return success (0).", result, is(0)); + + read.clear(); + result = client.read(table, id, null, read); + assertThat("Read, after update, did not return success (0).", result, is(0)); + for (String key : keys) { + ByteIterator iter = read.get(key); + + assertThat("Did not read the inserted field: " + key, iter, + notNullValue()); + assertTrue(iter.hasNext()); + assertThat(iter.nextByte(), is(Byte.valueOf((byte) 5))); + assertTrue(iter.hasNext()); + assertThat(iter.nextByte(), is(Byte.valueOf((byte) 6))); + assertTrue(iter.hasNext()); + assertThat(iter.nextByte(), is(Byte.valueOf((byte) 7))); + assertTrue(iter.hasNext()); + assertThat(iter.nextByte(), is(Byte.valueOf((byte) 8))); + assertFalse(iter.hasNext()); + } + } + /** * Test method for {@link DB#scan}. */ @@ -243,7 +307,18 @@ public abstract class AbstractDBTestCases { * * @return The test DB. */ - protected abstract DB getDB(); + protected DB getDB() { + return getDB(new Properties()); + } + + /** + * Gets the test DB. + * + * @param props + * Properties to pass to the client. + * @return The test DB. + */ + protected abstract DB getDB(Properties props); /** * Creates a zero padded integer. diff --git a/mongodb/src/test/java/com/yahoo/ycsb/db/AsyncMongoDbClientTest.java b/mongodb/src/test/java/com/yahoo/ycsb/db/AsyncMongoDbClientTest.java index 11b45b2db9b17e4dc8337a6a3472e900670c3e23..ff75def61d7a4041235029f9f9cdf301b9cb2adb 100644 --- a/mongodb/src/test/java/com/yahoo/ycsb/db/AsyncMongoDbClientTest.java +++ b/mongodb/src/test/java/com/yahoo/ycsb/db/AsyncMongoDbClientTest.java @@ -21,7 +21,6 @@ import static org.junit.Assume.assumeNoException; import java.util.Properties; import org.junit.After; -import org.junit.Before; import com.yahoo.ycsb.DB; @@ -33,20 +32,6 @@ public class AsyncMongoDbClientTest extends AbstractDBTestCases { /** The client to use. */ private AsyncMongoDbClient myClient = null; - /** - * Start a test client. - */ - @Before - public void setUp() { - myClient = new AsyncMongoDbClient(); - myClient.setProperties(new Properties()); - try { - myClient.init(); - } catch (Exception error) { - assumeNoException(error); - } - } - /** * Stops the test client. */ @@ -64,11 +49,20 @@ public class AsyncMongoDbClientTest extends AbstractDBTestCases { /** * {@inheritDoc} * <p> - * Overriden to return the {@link AsyncMongoDbClient}. + * Overridden to return the {@link AsyncMongoDbClient}. * </p> */ @Override - protected DB getDB() { + protected DB getDB(Properties props) { + if( myClient == null ) { + myClient = new AsyncMongoDbClient(); + myClient.setProperties(props); + try { + myClient.init(); + } catch (Exception error) { + assumeNoException(error); + } + } return myClient; } } diff --git a/mongodb/src/test/java/com/yahoo/ycsb/db/MongoDbClientTest.java b/mongodb/src/test/java/com/yahoo/ycsb/db/MongoDbClientTest.java index 47a926e42915d8f1a104c07474df3007212ee461..6a0bb58066d790a60f61b93565a102ebbb223c2b 100644 --- a/mongodb/src/test/java/com/yahoo/ycsb/db/MongoDbClientTest.java +++ b/mongodb/src/test/java/com/yahoo/ycsb/db/MongoDbClientTest.java @@ -21,7 +21,6 @@ import static org.junit.Assume.assumeNoException; import java.util.Properties; import org.junit.After; -import org.junit.Before; import com.yahoo.ycsb.DB; @@ -33,20 +32,6 @@ public class MongoDbClientTest extends AbstractDBTestCases { /** The client to use. */ private MongoDbClient myClient = null; - /** - * Start a test client. - */ - @Before - public void setUp() { - myClient = new MongoDbClient(); - myClient.setProperties(new Properties()); - try { - myClient.init(); - } catch (Exception error) { - assumeNoException(error); - } - } - /** * Stops the test client. */ @@ -64,11 +49,20 @@ public class MongoDbClientTest extends AbstractDBTestCases { /** * {@inheritDoc} * <p> - * Overriden to return the {@link MongoDbClient}. + * Overridden to return the {@link MongoDbClient}. * </p> */ @Override - protected DB getDB() { + protected DB getDB(Properties props) { + if( myClient == null ) { + myClient = new MongoDbClient(); + myClient.setProperties(props); + try { + myClient.init(); + } catch (Exception error) { + assumeNoException(error); + } + } return myClient; } }