Skip to content
Snippets Groups Projects
Commit 48a22a19 authored by allanbank's avatar allanbank
Browse files

Merge pull request #428 from allanbank/pr-400

[mongodb] Add mongodb.upsert configuration option.
parents 354b6286 7e04261d
No related branches found
No related tags found
No related merge requests found
......@@ -108,8 +108,14 @@ See the next section for the list of configuration parameters for MongoDB.
- Useful for the insert workload as it will submit the inserts in batches inproving throughput.
- Default value is `1`.
- `mongodb.upsert`
- Determines if the insert operation performs an update with the upsert operation or a insert.
Upserts have the advantage that they will continue to work for a partially loaded data set.
- Setting to `true` uses updates, `false` uses insert operations.
- Default value is `false`.
- `mongodb.writeConcern`
- **Deprecated** - Use the `w` and `journal` options on the MongoDB URI provided by the `mongodb.uri`.
- **Deprecated** - Use the `w` and `journal` options on the MongoDB URI provided by the `mongodb.url`.
- Allowed values are :
- `errors_ignored`
- `unacknowledged`
......@@ -120,7 +126,7 @@ See the next section for the list of configuration parameters for MongoDB.
- Default value is `acknowledged`.
- `mongodb.readPreference`
- **Deprecated** - Use the `readPreference` options on the MongoDB URI provided by the `mongodb.uri`.
- **Deprecated** - Use the `readPreference` options on the MongoDB URI provided by the `mongodb.url`.
- Allowed values are :
- `primary`
- `primary_preferred`
......@@ -130,11 +136,11 @@ See the next section for the list of configuration parameters for MongoDB.
- Default value is `primary`.
- `mongodb.maxconnections`
- **Deprecated** - Use the `maxPoolSize` options on the MongoDB URI provided by the `mongodb.uri`.
- **Deprecated** - Use the `maxPoolSize` options on the MongoDB URI provided by the `mongodb.url`.
- Default value is `100`.
- `mongodb.threadsAllowedToBlockForConnectionMultiplier`
- **Deprecated** - Use the `waitQueueMultiple` options on the MongoDB URI provided by the `mongodb.uri`.
- **Deprecated** - Use the `waitQueueMultiple` options on the MongoDB URI provided by the `mongodb.url`.
- Default value is `5`.
For example:
......
......@@ -96,6 +96,9 @@ public class AsyncMongoDbClient extends DB {
/** The batch size to use for inserts. */
private static int batchSize;
/** If true then use updates with the upsert option for inserts. */
private static boolean useUpsert;
/** The bulk inserts pending for the thread. */
private final BatchedWrite.Builder batchedWrite = BatchedWrite.builder()
......@@ -178,7 +181,11 @@ public class AsyncMongoDbClient extends DB {
// Set insert batchsize, default 1 - to be YCSB-original equivalent
batchSize = Integer.parseInt(props.getProperty("mongodb.batchsize", "1"));
// Set is inserts are done as upserts. Defaults to false.
useUpsert = Boolean.parseBoolean(
props.getProperty("mongodb.upsert", "false"));
// Just use the standard connection format URL
// http://docs.mongodb.org/manual/reference/connection-string/
// to configure the client.
......@@ -255,15 +262,26 @@ public class AsyncMongoDbClient extends DB {
// Do an upsert.
if (batchSize <= 1) {
long result = collection.update(query, toInsert,
/* multi= */false, /* upsert= */true, writeConcern);
long result;
if (useUpsert) {
result = collection.update(query, toInsert,
/* multi= */false, /* upsert= */true, writeConcern);
} else {
// Return is not stable pre-SERVER-4381. No exception is success.
collection.insert(writeConcern, toInsert);
result = 1;
}
return result == 1 ? 0 : 1;
}
// Use a bulk insert.
try {
batchedWrite.insert(toInsert);
if (useUpsert) {
batchedWrite.update(query, toInsert, /* multi= */false,
/* upsert= */true);
} else {
batchedWrite.insert(toInsert);
}
batchedWriteCount += 1;
if (batchedWriteCount < batchSize) {
......
......@@ -34,6 +34,8 @@ import java.util.Vector;
import java.util.concurrent.atomic.AtomicInteger;
import com.mongodb.client.model.InsertManyOptions;
import com.mongodb.client.model.UpdateOneModel;
import org.bson.Document;
import org.bson.types.Binary;
......@@ -103,6 +105,9 @@ public class MongoDbClient extends DB {
/** The batch size to use for inserts. */
private static int batchSize;
/** If true then use updates with the upsert option for inserts. */
private static boolean useUpsert;
/** The bulk inserts pending for the thread. */
private final List<Document> bulkInserts = new ArrayList<Document>();
......@@ -173,6 +178,10 @@ public class MongoDbClient extends DB {
// Set insert batchsize, default 1 - to be YCSB-original equivalent
batchSize = Integer.parseInt(props.getProperty("batchsize", "1"));
// Set is inserts are done as upserts. Defaults to false.
useUpsert = Boolean.parseBoolean(
props.getProperty("mongodb.upsert", "false"));
// Just use the standard connection format URL
// http://docs.mongodb.org/manual/reference/connection-string/
// to configure the client.
......@@ -251,15 +260,30 @@ public class MongoDbClient extends DB {
}
if (batchSize == 1) {
// this is effectively an insert, but using an upsert instead due
// to current inability of the framework to clean up after itself
// between test runs.
collection.replaceOne(new Document("_id", toInsert.get("_id")),
toInsert, UPDATE_WITH_UPSERT);
if (useUpsert) {
// this is effectively an insert, but using an upsert instead due
// to current inability of the framework to clean up after itself
// between test runs.
collection.replaceOne(new Document("_id", toInsert.get("_id")),
toInsert, UPDATE_WITH_UPSERT);
} else {
collection.insertOne(toInsert);
}
} else {
bulkInserts.add(toInsert);
if (bulkInserts.size() == batchSize) {
collection.insertMany(bulkInserts, INSERT_UNORDERED);
if (useUpsert) {
List<UpdateOneModel<Document>> updates =
new ArrayList<UpdateOneModel<Document>>(bulkInserts.size());
for (Document doc : bulkInserts) {
updates.add(new UpdateOneModel<Document>(
new Document("_id", doc.get("_id")),
doc, UPDATE_WITH_UPSERT));
}
collection.bulkWrite(updates);
} else {
collection.insertMany(bulkInserts, INSERT_UNORDERED);
}
bulkInserts.clear();
}
}
......
......@@ -29,6 +29,7 @@ import java.net.InetAddress;
import java.net.Socket;
import java.util.Collections;
import java.util.HashMap;
import java.util.Properties;
import java.util.Set;
import java.util.Vector;
......@@ -189,6 +190,69 @@ public abstract class AbstractDBTestCases {
}
}
/**
* Test method for {@link DB#insert}, {@link DB#read}, and {@link DB#update} .
*/
@Test
public void testInsertReadUpdateWithUpsert() {
Properties props = new Properties();
props.setProperty("mongodb.upsert", "true");
DB client = getDB(props);
final String table = getClass().getSimpleName();
final String id = "updateWithUpsert";
HashMap<String, ByteIterator> inserted =
new HashMap<String, ByteIterator>();
inserted.put("a", new ByteArrayByteIterator(new byte[] { 1, 2, 3, 4 }));
int result = client.insert(table, id, inserted);
assertThat("Insert did not return success (0).", result, is(0));
HashMap<String, ByteIterator> read = new HashMap<String, ByteIterator>();
Set<String> keys = Collections.singleton("a");
result = client.read(table, id, keys, read);
assertThat("Read did not return success (0).", result, is(0));
for (String key : keys) {
ByteIterator iter = read.get(key);
assertThat("Did not read the inserted field: " + key, iter,
notNullValue());
assertTrue(iter.hasNext());
assertThat(iter.nextByte(), is(Byte.valueOf((byte) 1)));
assertTrue(iter.hasNext());
assertThat(iter.nextByte(), is(Byte.valueOf((byte) 2)));
assertTrue(iter.hasNext());
assertThat(iter.nextByte(), is(Byte.valueOf((byte) 3)));
assertTrue(iter.hasNext());
assertThat(iter.nextByte(), is(Byte.valueOf((byte) 4)));
assertFalse(iter.hasNext());
}
HashMap<String, ByteIterator> updated = new HashMap<String, ByteIterator>();
updated.put("a", new ByteArrayByteIterator(new byte[] { 5, 6, 7, 8 }));
result = client.update(table, id, updated);
assertThat("Update did not return success (0).", result, is(0));
read.clear();
result = client.read(table, id, null, read);
assertThat("Read, after update, did not return success (0).", result, is(0));
for (String key : keys) {
ByteIterator iter = read.get(key);
assertThat("Did not read the inserted field: " + key, iter,
notNullValue());
assertTrue(iter.hasNext());
assertThat(iter.nextByte(), is(Byte.valueOf((byte) 5)));
assertTrue(iter.hasNext());
assertThat(iter.nextByte(), is(Byte.valueOf((byte) 6)));
assertTrue(iter.hasNext());
assertThat(iter.nextByte(), is(Byte.valueOf((byte) 7)));
assertTrue(iter.hasNext());
assertThat(iter.nextByte(), is(Byte.valueOf((byte) 8)));
assertFalse(iter.hasNext());
}
}
/**
* Test method for {@link DB#scan}.
*/
......@@ -243,7 +307,18 @@ public abstract class AbstractDBTestCases {
*
* @return The test DB.
*/
protected abstract DB getDB();
protected DB getDB() {
return getDB(new Properties());
}
/**
* Gets the test DB.
*
* @param props
* Properties to pass to the client.
* @return The test DB.
*/
protected abstract DB getDB(Properties props);
/**
* Creates a zero padded integer.
......
......@@ -21,7 +21,6 @@ import static org.junit.Assume.assumeNoException;
import java.util.Properties;
import org.junit.After;
import org.junit.Before;
import com.yahoo.ycsb.DB;
......@@ -33,20 +32,6 @@ public class AsyncMongoDbClientTest extends AbstractDBTestCases {
/** The client to use. */
private AsyncMongoDbClient myClient = null;
/**
* Start a test client.
*/
@Before
public void setUp() {
myClient = new AsyncMongoDbClient();
myClient.setProperties(new Properties());
try {
myClient.init();
} catch (Exception error) {
assumeNoException(error);
}
}
/**
* Stops the test client.
*/
......@@ -64,11 +49,20 @@ public class AsyncMongoDbClientTest extends AbstractDBTestCases {
/**
* {@inheritDoc}
* <p>
* Overriden to return the {@link AsyncMongoDbClient}.
* Overridden to return the {@link AsyncMongoDbClient}.
* </p>
*/
@Override
protected DB getDB() {
protected DB getDB(Properties props) {
if( myClient == null ) {
myClient = new AsyncMongoDbClient();
myClient.setProperties(props);
try {
myClient.init();
} catch (Exception error) {
assumeNoException(error);
}
}
return myClient;
}
}
......@@ -21,7 +21,6 @@ import static org.junit.Assume.assumeNoException;
import java.util.Properties;
import org.junit.After;
import org.junit.Before;
import com.yahoo.ycsb.DB;
......@@ -33,20 +32,6 @@ public class MongoDbClientTest extends AbstractDBTestCases {
/** The client to use. */
private MongoDbClient myClient = null;
/**
* Start a test client.
*/
@Before
public void setUp() {
myClient = new MongoDbClient();
myClient.setProperties(new Properties());
try {
myClient.init();
} catch (Exception error) {
assumeNoException(error);
}
}
/**
* Stops the test client.
*/
......@@ -64,11 +49,20 @@ public class MongoDbClientTest extends AbstractDBTestCases {
/**
* {@inheritDoc}
* <p>
* Overriden to return the {@link MongoDbClient}.
* Overridden to return the {@link MongoDbClient}.
* </p>
*/
@Override
protected DB getDB() {
protected DB getDB(Properties props) {
if( myClient == null ) {
myClient = new MongoDbClient();
myClient.setProperties(props);
try {
myClient.init();
} catch (Exception error) {
assumeNoException(error);
}
}
return myClient;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment