diff --git a/bin/ycsb b/bin/ycsb index 16c5a82a639e1e4068e5eaa52b63bb1a87cbfc78..a1a4ea206e44c9ed5ac3849bb88a1dc2ae25e656 100755 --- a/bin/ycsb +++ b/bin/ycsb @@ -58,6 +58,7 @@ DATABASES = { "cassandra-cql": "com.yahoo.ycsb.db.CassandraCQLClient", "cassandra2-cql": "com.yahoo.ycsb.db.CassandraCQLClient", "couchbase" : "com.yahoo.ycsb.db.CouchbaseClient", + "couchbase2" : "com.yahoo.ycsb.db.couchbase2.Couchbase2Client", "dynamodb" : "com.yahoo.ycsb.db.DynamoDBClient", "elasticsearch": "com.yahoo.ycsb.db.ElasticsearchClient", "geode" : "com.yahoo.ycsb.db.GeodeClient", diff --git a/couchbase2/README.md b/couchbase2/README.md new file mode 100644 index 0000000000000000000000000000000000000000..455a4eea02205479b0cc255a82c808e063906ee9 --- /dev/null +++ b/couchbase2/README.md @@ -0,0 +1,115 @@ +<!-- +Copyright (c) 2015 - 2016 YCSB contributors. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); you +may not use this file except in compliance with the License. You +may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +implied. See the License for the specific language governing +permissions and limitations under the License. See accompanying +LICENSE file. +--> + +# Couchbase (SDK 2.x) Driver for YCSB +This driver is a binding for the YCSB facilities to operate against a Couchbase Server cluster. It uses the official +Couchbase Java SDK (version 2.x) and provides a rich set of configuration options, including support for the N1QL +query language. + +## Quickstart + +### 1. Start Couchbase Server +You need to start a single node or a cluster to point the client at. Please see [http://couchbase.com](couchbase.com) +for more details and instructions. + +### 2. Set up YCSB +You can either download the release zip and run it, or just clone from master. + +``` +git clone git://github.com/brianfrankcooper/YCSB.git +cd YCSB +mvn clean package +``` + +### 3. Run the Workload +Before you can actually run the workload, you need to "load" the data first. + +``` +bin/ycsb load couchbase2 -s -P workloads/workloada +``` + +Then, you can run the workload: + +``` +bin/ycsb run couchbase2 -s -P workloads/workloada +``` + +Please see the general instructions in the `doc` folder if you are not sure how it all works. You can apply a property +(as seen in the next section) like this: + +``` +bin/ycsb run couchbase -s -P workloads/workloada -p couchbase.epoll=true +``` + +## N1QL Index Setup +In general, every time N1QL is used (either implicitly through using `workloade` or through setting `kv=false`) some +kind of index must be present to make it work. Depending on the workload and data size, choosing the right index is +crucial at runtime in order to get the best performance. If in doubt, please ask at the +[forums](http://forums.couchbase.com) or get in touch with our team at Couchbase. + +For `workloade` and the default `readallfields=true` we recommend creating the following index, and if using Couchbase +Server 4.5 or later with the "Memory Optimized Index" setting on the bucket. + +``` +CREATE INDEX wle_idx ON `bucketname`(meta().id); +``` + +For other workloads, different index setups might be even more performant. + +## Performance Considerations +As it is with any benchmark, there are lot of knobs to tune in order to get great or (if you are reading +this and trying to write a competitor benchmark ;-)) bad performance. + +The first setting you should consider, if you are running on Linux 64bit is setting `-p couchbase.epoll=true`. This will +then turn on the Epoll IO mechanisms in the underlying Netty library which provides better performance since it has less +synchronization to do than the NIO default. This only works on Linux, but you are benchmarking on the OS you are +deploying to, right? + +The second option, `boost`, sounds more magic than it actually is. By default this benchmark trades CPU for throughput, +but this can be disabled by setting `-p couchbase.boost=0`. This defaults to 3, and 3 is the number of event loops run +in the IO layer. 3 is a reasonable default but you should set it to the number of **physical** cores you have available +on the machine if you only plan to run one YCSB instance. Make sure (using profiling) to max out your cores, but don't +overdo it. + +## Sync vs Async +By default, since YCSB is sync the code will always wait for the operation to complete. In some cases it can be useful +to just "drive load" and disable the waiting. Note that when the "-p couchbase.syncMutationResponse=false" option is +used, the measured results by YCSB can basically be thrown away. Still helpful sometimes during load phases to speed +them up :) + +## Configuration Options +Since no setup is the same and the goal of YCSB is to deliver realistic benchmarks, here are some setups that you can +tune. Note that if you need more flexibility (let's say a custom transcoder), you still need to extend this driver and +implement the facilities on your own. + +You can set the following properties (with the default settings applied): + + - couchbase.host=127.0.0.1: The hostname from one server. + - couchbase.bucket=default: The bucket name to use. + - couchbase.password=: The password of the bucket. + - couchbase.syncMutationResponse=true: If mutations should wait for the response to complete. + - couchbase.persistTo=0: Persistence durability requirement + - couchbase.replicateTo=0: Replication durability requirement + - couchbase.upsert=false: Use upsert instead of insert or replace. + - couchbase.adhoc=false: If set to true, prepared statements are not used. + - couchbase.kv=true: If set to false, mutation operations will also be performed through N1QL. + - couchbase.maxParallelism=1: The server parallelism for all n1ql queries. + - couchbase.kvEndpoints=1: The number of KV sockets to open per server. + - couchbase.queryEndpoints=5: The number of N1QL Query sockets to open per server. + - couchbase.epoll=false: If Epoll instead of NIO should be used (only available for linux. + - couchbase.boost=3: If > 0 trades CPU for higher throughput. N is the number of event loops, ideally + set to the number of physical cores. Setting higher than that will likely degrade performance. \ No newline at end of file diff --git a/couchbase2/pom.xml b/couchbase2/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..f152a856efb6d94bab62b503e2984cfcea737c35 --- /dev/null +++ b/couchbase2/pom.xml @@ -0,0 +1,48 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Copyright (c) 2015 YCSB contributors. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); you +may not use this file except in compliance with the License. You +may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +implied. See the License for the specific language governing +permissions and limitations under the License. See accompanying +LICENSE file. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>com.yahoo.ycsb</groupId> + <artifactId>binding-parent</artifactId> + <version>0.9.0-SNAPSHOT</version> + <relativePath>../binding-parent</relativePath> + </parent> + + <artifactId>couchbase2-binding</artifactId> + <name>Couchbase Java SDK 2.x Binding</name> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>com.couchbase.client</groupId> + <artifactId>java-client</artifactId> + <version>${couchbase2.version}</version> + </dependency> + <dependency> + <groupId>com.yahoo.ycsb</groupId> + <artifactId>core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + </dependencies> + +</project> diff --git a/couchbase2/src/main/java/com/yahoo/ycsb/db/couchbase2/Couchbase2Client.java b/couchbase2/src/main/java/com/yahoo/ycsb/db/couchbase2/Couchbase2Client.java new file mode 100644 index 0000000000000000000000000000000000000000..41695243ee10f392baab65d0ed00cabec02e6cae --- /dev/null +++ b/couchbase2/src/main/java/com/yahoo/ycsb/db/couchbase2/Couchbase2Client.java @@ -0,0 +1,896 @@ +/** + * Copyright (c) 2016 Yahoo! Inc. All rights reserved. + * <p> + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +package com.yahoo.ycsb.db.couchbase2; + +import com.couchbase.client.core.env.DefaultCoreEnvironment; +import com.couchbase.client.core.env.resources.IoPoolShutdownHook; +import com.couchbase.client.core.logging.CouchbaseLogger; +import com.couchbase.client.core.logging.CouchbaseLoggerFactory; +import com.couchbase.client.deps.com.fasterxml.jackson.core.JsonFactory; +import com.couchbase.client.deps.com.fasterxml.jackson.core.JsonGenerator; +import com.couchbase.client.deps.com.fasterxml.jackson.databind.JsonNode; +import com.couchbase.client.deps.com.fasterxml.jackson.databind.node.ObjectNode; +import com.couchbase.client.deps.io.netty.channel.DefaultSelectStrategyFactory; +import com.couchbase.client.deps.io.netty.channel.EventLoopGroup; +import com.couchbase.client.deps.io.netty.channel.SelectStrategy; +import com.couchbase.client.deps.io.netty.channel.SelectStrategyFactory; +import com.couchbase.client.deps.io.netty.channel.epoll.EpollEventLoopGroup; +import com.couchbase.client.deps.io.netty.channel.nio.NioEventLoopGroup; +import com.couchbase.client.deps.io.netty.util.IntSupplier; +import com.couchbase.client.deps.io.netty.util.concurrent.DefaultThreadFactory; +import com.couchbase.client.java.Bucket; +import com.couchbase.client.java.Cluster; +import com.couchbase.client.java.CouchbaseCluster; +import com.couchbase.client.java.PersistTo; +import com.couchbase.client.java.ReplicateTo; +import com.couchbase.client.java.document.Document; +import com.couchbase.client.java.document.RawJsonDocument; +import com.couchbase.client.java.document.json.JsonArray; +import com.couchbase.client.java.document.json.JsonObject; +import com.couchbase.client.java.env.CouchbaseEnvironment; +import com.couchbase.client.java.env.DefaultCouchbaseEnvironment; +import com.couchbase.client.java.error.TemporaryFailureException; +import com.couchbase.client.java.query.*; +import com.couchbase.client.java.transcoder.JacksonTransformers; +import com.couchbase.client.java.util.Blocking; +import com.yahoo.ycsb.ByteIterator; +import com.yahoo.ycsb.DB; +import com.yahoo.ycsb.DBException; +import com.yahoo.ycsb.Status; +import com.yahoo.ycsb.StringByteIterator; +import rx.Observable; +import rx.Subscriber; +import rx.functions.Action1; +import rx.functions.Func1; + +import java.io.StringWriter; +import java.io.Writer; +import java.nio.channels.spi.SelectorProvider; +import java.util.*; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; + +/** + * A class that wraps the 2.x Couchbase SDK to be used with YCSB. + * + * <p> The following options can be passed when using this database client to override the defaults. + * + * <ul> + * <li><b>couchbase.host=127.0.0.1</b> The hostname from one server.</li> + * <li><b>couchbase.bucket=default</b> The bucket name to use.</li> + * <li><b>couchbase.password=</b> The password of the bucket.</li> + * <li><b>couchbase.syncMutationResponse=true</b> If mutations should wait for the response to complete.</li> + * <li><b>couchbase.persistTo=0</b> Persistence durability requirement</li> + * <li><b>couchbase.replicateTo=0</b> Replication durability requirement</li> + * <li><b>couchbase.upsert=false</b> Use upsert instead of insert or replace.</li> + * <li><b>couchbase.adhoc=false</b> If set to true, prepared statements are not used.</li> + * <li><b>couchbase.kv=true</b> If set to false, mutation operations will also be performed through N1QL.</li> + * <li><b>couchbase.maxParallelism=1</b> The server parallelism for all n1ql queries.</li> + * <li><b>couchbase.kvEndpoints=1</b> The number of KV sockets to open per server.</li> + * <li><b>couchbase.queryEndpoints=5</b> The number of N1QL Query sockets to open per server.</li> + * <li><b>couchbase.epoll=false</b> If Epoll instead of NIO should be used (only available for linux.</li> + * <li><b>couchbase.boost=3</b> If > 0 trades CPU for higher throughput. N is the number of event loops, ideally + * set to the number of physical cores. Setting higher than that will likely degrade performance.</li> + * </ul> + */ +public class Couchbase2Client extends DB { + + static { + // No need to send the full encoded_plan for this benchmark workload, less network overhead! + System.setProperty("com.couchbase.query.encodedPlanEnabled", "false"); + } + + private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance(Couchbase2Client.class); + private static final Object INIT_COORDINATOR = new Object(); + + private static volatile CouchbaseEnvironment env = null; + + private Cluster cluster; + private Bucket bucket; + private String bucketName; + private boolean upsert; + private PersistTo persistTo; + private ReplicateTo replicateTo; + private boolean syncMutResponse; + private boolean epoll; + private long kvTimeout; + private boolean adhoc; + private boolean kv; + private int maxParallelism; + private String host; + private int kvEndpoints; + private int queryEndpoints; + private int boost; + + @Override + public void init() throws DBException { + Properties props = getProperties(); + + host = props.getProperty("couchbase.host", "127.0.0.1"); + bucketName = props.getProperty("couchbase.bucket", "default"); + String bucketPassword = props.getProperty("couchbase.password", ""); + + upsert = props.getProperty("couchbase.upsert", "false").equals("true"); + persistTo = parsePersistTo(props.getProperty("couchbase.persistTo", "0")); + replicateTo = parseReplicateTo(props.getProperty("couchbase.replicateTo", "0")); + syncMutResponse = props.getProperty("couchbase.syncMutationResponse", "true").equals("true"); + adhoc = props.getProperty("couchbase.adhoc", "false").equals("true"); + kv = props.getProperty("couchbase.kv", "true").equals("true"); + maxParallelism = Integer.parseInt(props.getProperty("couchbase.maxParallelism", "1")); + kvEndpoints = Integer.parseInt(props.getProperty("couchbase.kvEndpoints", "1")); + queryEndpoints = Integer.parseInt(props.getProperty("couchbase.queryEndpoints", "5")); + epoll = props.getProperty("couchbase.epoll", "false").equals("true"); + boost = Integer.parseInt(props.getProperty("couchbase.boost", "3")); + + try { + synchronized (INIT_COORDINATOR) { + if (env == null) { + DefaultCouchbaseEnvironment.Builder builder = DefaultCouchbaseEnvironment + .builder() + .queryEndpoints(queryEndpoints) + .callbacksOnIoPool(true) + .kvEndpoints(kvEndpoints); + + // Tune boosting and epoll based on settings + SelectStrategyFactory factory = boost > 0 ? + new BackoffSelectStrategyFactory() : DefaultSelectStrategyFactory.INSTANCE; + + int poolSize = boost > 0 ? boost : Integer.parseInt( + System.getProperty("com.couchbase.ioPoolSize", Integer.toString(DefaultCoreEnvironment.IO_POOL_SIZE)) + ); + ThreadFactory threadFactory = new DefaultThreadFactory("cb-io", true); + + EventLoopGroup group = epoll ? new EpollEventLoopGroup(poolSize, threadFactory, factory) + : new NioEventLoopGroup(poolSize, threadFactory, SelectorProvider.provider(), factory); + builder.ioPool(group, new IoPoolShutdownHook(group)); + + env = builder.build(); + logParams(); + } + } + + cluster = CouchbaseCluster.create(env, host); + bucket = cluster.openBucket(bucketName, bucketPassword); + kvTimeout = env.kvTimeout(); + } catch (Exception ex) { + throw new DBException("Could not connect to Couchbase Bucket.", ex); + } + + if (!kv && !syncMutResponse) { + throw new DBException("Not waiting for N1QL responses on mutations not yet implemented."); + } + } + + /** + * Helper method to log the CLI params so that on the command line debugging is easier. + */ + private void logParams() { + StringBuilder sb = new StringBuilder(); + + sb.append("host=").append(host); + sb.append(", bucket=").append(bucketName); + sb.append(", upsert=").append(upsert); + sb.append(", persistTo=").append(persistTo); + sb.append(", replicateTo=").append(replicateTo); + sb.append(", syncMutResponse=").append(syncMutResponse); + sb.append(", adhoc=").append(adhoc); + sb.append(", kv=").append(kv); + sb.append(", maxParallelism=").append(maxParallelism); + sb.append(", queryEndpoints=").append(queryEndpoints); + sb.append(", kvEndpoints=").append(kvEndpoints); + sb.append(", queryEndpoints=").append(queryEndpoints); + sb.append(", epoll=").append(epoll); + sb.append(", boost=").append(boost); + + LOGGER.info("===> Using Params: " + sb.toString()); + } + + @Override + public Status read(final String table, final String key, Set<String> fields, + final HashMap<String, ByteIterator> result) { + try { + String docId = formatId(table, key); + if (kv) { + return readKv(docId, fields, result); + } else { + return readN1ql(docId, fields, result); + } + } catch (Exception ex) { + ex.printStackTrace(); + return Status.ERROR; + } + } + + /** + * Performs the {@link #read(String, String, Set, HashMap)} operation via Key/Value ("get"). + * + * @param docId the document ID + * @param fields the fields to be loaded + * @param result the result map where the doc needs to be converted into + * @return The result of the operation. + */ + private Status readKv(final String docId, final Set<String> fields, final HashMap<String, ByteIterator> result) + throws Exception { + RawJsonDocument loaded = bucket.get(docId, RawJsonDocument.class); + if (loaded == null) { + return Status.NOT_FOUND; + } + decode(loaded.content(), fields, result); + return Status.OK; + } + + /** + * Performs the {@link #read(String, String, Set, HashMap)} operation via N1QL ("SELECT"). + * + * If this option should be used, the "-p couchbase.kv=false" property must be set. + * + * @param docId the document ID + * @param fields the fields to be loaded + * @param result the result map where the doc needs to be converted into + * @return The result of the operation. + */ + private Status readN1ql(final String docId, Set<String> fields, final HashMap<String, ByteIterator> result) + throws Exception { + String readQuery = "SELECT " + joinFields(fields) + " FROM `" + bucketName + "` USE KEYS [$1]"; + N1qlQueryResult queryResult = bucket.query(N1qlQuery.parameterized( + readQuery, + JsonArray.from(docId), + N1qlParams.build().adhoc(adhoc).maxParallelism(maxParallelism) + )); + + if (!queryResult.parseSuccess() || !queryResult.finalSuccess()) { + throw new DBException("Error while parsing N1QL Result. Query: " + readQuery + + ", Errors: " + queryResult.errors()); + } + + N1qlQueryRow row; + try { + row = queryResult.rows().next(); + } catch (NoSuchElementException ex) { + return Status.NOT_FOUND; + } + + JsonObject content = row.value(); + if (fields == null) { + content = content.getObject(bucketName); // n1ql result set scoped under *.bucketName + fields = content.getNames(); + } + + for (String field : fields) { + Object value = content.get(field); + result.put(field, new StringByteIterator(value != null ? value.toString() : "")); + } + + return Status.OK; + } + + @Override + public Status update(final String table, final String key, final HashMap<String, ByteIterator> values) { + if (upsert) { + return upsert(table, key, values); + } + + try { + String docId = formatId(table, key); + if (kv) { + return updateKv(docId, values); + } else { + return updateN1ql(docId, values); + } + } catch (Exception ex) { + ex.printStackTrace(); + return Status.ERROR; + } + } + + /** + * Performs the {@link #update(String, String, HashMap)} operation via Key/Value ("replace"). + * + * @param docId the document ID + * @param values the values to update the document with. + * @return The result of the operation. + */ + private Status updateKv(final String docId, final HashMap<String, ByteIterator> values) { + waitForMutationResponse(bucket.async().replace( + RawJsonDocument.create(docId, encode(values)), + persistTo, + replicateTo + )); + return Status.OK; + } + + /** + * Performs the {@link #update(String, String, HashMap)} operation via N1QL ("UPDATE"). + * + * If this option should be used, the "-p couchbase.kv=false" property must be set. + * + * @param docId the document ID + * @param values the values to update the document with. + * @return The result of the operation. + */ + private Status updateN1ql(final String docId, final HashMap<String, ByteIterator> values) + throws Exception { + String fields = encodeN1qlFields(values); + String updateQuery = "UPDATE `" + bucketName + "` USE KEYS [$1] SET " + fields; + + N1qlQueryResult queryResult = bucket.query(N1qlQuery.parameterized( + updateQuery, + JsonArray.from(docId), + N1qlParams.build().adhoc(adhoc).maxParallelism(maxParallelism) + )); + + if (!queryResult.parseSuccess() || !queryResult.finalSuccess()) { + throw new DBException("Error while parsing N1QL Result. Query: " + updateQuery + + ", Errors: " + queryResult.errors()); + } + return Status.OK; + } + + @Override + public Status insert(final String table, final String key, final HashMap<String, ByteIterator> values) { + if (upsert) { + return upsert(table, key, values); + } + + try { + String docId = formatId(table, key); + if (kv) { + return insertKv(docId, values); + } else { + return insertN1ql(docId, values); + } + } catch (Exception ex) { + ex.printStackTrace(); + return Status.ERROR; + } + } + + /** + * Performs the {@link #insert(String, String, HashMap)} operation via Key/Value ("INSERT"). + * + * Note that during the "load" phase it makes sense to retry TMPFAILS (so that even if the server is + * overloaded temporarily the ops will succeed eventually). The current code will retry TMPFAILs + * for maximum of one minute and then bubble up the error. + * + * @param docId the document ID + * @param values the values to update the document with. + * @return The result of the operation. + */ + private Status insertKv(final String docId, final HashMap<String, ByteIterator> values) { + int tries = 60; // roughly 60 seconds with the 1 second sleep, not 100% accurate. + + for(int i = 0; i < tries; i++) { + try { + waitForMutationResponse(bucket.async().insert( + RawJsonDocument.create(docId, encode(values)), + persistTo, + replicateTo + )); + return Status.OK; + } catch (TemporaryFailureException ex) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted while sleeping on TMPFAIL backoff.", ex); + } + } + } + + throw new RuntimeException("Still receiving TMPFAIL from the server after trying " + tries + " times. " + + "Check your server."); + } + + /** + * Performs the {@link #insert(String, String, HashMap)} operation via N1QL ("INSERT"). + * + * If this option should be used, the "-p couchbase.kv=false" property must be set. + * + * @param docId the document ID + * @param values the values to update the document with. + * @return The result of the operation. + */ + private Status insertN1ql(final String docId, final HashMap<String, ByteIterator> values) + throws Exception { + String insertQuery = "INSERT INTO `" + bucketName + "`(KEY,VALUE) VALUES ($1,$2)"; + + N1qlQueryResult queryResult = bucket.query(N1qlQuery.parameterized( + insertQuery, + JsonArray.from(docId, valuesToJsonObject(values)), + N1qlParams.build().adhoc(adhoc).maxParallelism(maxParallelism) + )); + + if (!queryResult.parseSuccess() || !queryResult.finalSuccess()) { + throw new DBException("Error while parsing N1QL Result. Query: " + insertQuery + + ", Errors: " + queryResult.errors()); + } + return Status.OK; + } + + /** + * Performs an upsert instead of insert or update using either Key/Value or N1QL. + * + * If this option should be used, the "-p couchbase.upsert=true" property must be set. + * + * @param table The name of the table + * @param key The record key of the record to insert. + * @param values A HashMap of field/value pairs to insert in the record + * @return The result of the operation. + */ + private Status upsert(final String table, final String key, final HashMap<String, ByteIterator> values) { + try { + String docId = formatId(table, key); + if (kv) { + return upsertKv(docId, values); + } else { + return upsertN1ql(docId, values); + } + } catch (Exception ex) { + ex.printStackTrace(); + return Status.ERROR; + } + } + + /** + * Performs the {@link #upsert(String, String, HashMap)} operation via Key/Value ("upsert"). + * + * If this option should be used, the "-p couchbase.upsert=true" property must be set. + * + * @param docId the document ID + * @param values the values to update the document with. + * @return The result of the operation. + */ + private Status upsertKv(final String docId, final HashMap<String, ByteIterator> values) { + waitForMutationResponse(bucket.async().upsert( + RawJsonDocument.create(docId, encode(values)), + persistTo, + replicateTo + )); + return Status.OK; + } + + /** + * Performs the {@link #upsert(String, String, HashMap)} operation via N1QL ("UPSERT"). + * + * If this option should be used, the "-p couchbase.upsert=true -p couchbase.kv=false" properties must be set. + * + * @param docId the document ID + * @param values the values to update the document with. + * @return The result of the operation. + */ + private Status upsertN1ql(final String docId, final HashMap<String, ByteIterator> values) + throws Exception { + String upsertQuery = "UPSERT INTO `" + bucketName + "`(KEY,VALUE) VALUES ($1,$2)"; + + N1qlQueryResult queryResult = bucket.query(N1qlQuery.parameterized( + upsertQuery, + JsonArray.from(docId, valuesToJsonObject(values)), + N1qlParams.build().adhoc(adhoc).maxParallelism(maxParallelism) + )); + + if (!queryResult.parseSuccess() || !queryResult.finalSuccess()) { + throw new DBException("Error while parsing N1QL Result. Query: " + upsertQuery + + ", Errors: " + queryResult.errors()); + } + return Status.OK; + } + + @Override + public Status delete(final String table, final String key) { + try { + String docId = formatId(table, key); + if (kv) { + return deleteKv(docId); + } else { + return deleteN1ql(docId); + } + } catch (Exception ex) { + ex.printStackTrace(); + return Status.ERROR; + } + } + + /** + * Performs the {@link #delete(String, String)} (String, String)} operation via Key/Value ("remove"). + * + * @param docId the document ID. + * @return The result of the operation. + */ + private Status deleteKv(final String docId) { + waitForMutationResponse(bucket.async().remove( + docId, + persistTo, + replicateTo + )); + return Status.OK; + } + + /** + * Performs the {@link #delete(String, String)} (String, String)} operation via N1QL ("DELETE"). + * + * If this option should be used, the "-p couchbase.kv=false" property must be set. + * + * @param docId the document ID. + * @return The result of the operation. + */ + private Status deleteN1ql(final String docId) throws Exception { + String deleteQuery = "DELETE FROM `" + bucketName + "` USE KEYS [$1]"; + N1qlQueryResult queryResult = bucket.query(N1qlQuery.parameterized( + deleteQuery, + JsonArray.from(docId), + N1qlParams.build().adhoc(adhoc).maxParallelism(maxParallelism) + )); + + if (!queryResult.parseSuccess() || !queryResult.finalSuccess()) { + throw new DBException("Error while parsing N1QL Result. Query: " + deleteQuery + + ", Errors: " + queryResult.errors()); + } + return Status.OK; + } + + @Override + public Status scan(final String table, final String startkey, final int recordcount, final Set<String> fields, + final Vector<HashMap<String, ByteIterator>> result) { + try { + if (fields == null || fields.isEmpty()) { + return scanAllFields(table, startkey, recordcount, result); + } else { + return scanSpecificFields(table, startkey, recordcount, fields, result); + } + } catch (Exception ex) { + ex.printStackTrace(); + return Status.ERROR; + } + } + + /** + * Performs the {@link #scan(String, String, int, Set, Vector)} operation, optimized for all fields. + * + * Since the full document bodies need to be loaded anyways, it makes sense to just grab the document IDs + * from N1QL and then perform the bulk loading via KV for better performance. This is a usual pattern with + * Couchbase and shows the benefits of using both N1QL and KV together. + * + * @param table The name of the table + * @param startkey The record key of the first record to read. + * @param recordcount The number of records to read + * @param result A Vector of HashMaps, where each HashMap is a set field/value pairs for one record + * @return The result of the operation. + */ + private Status scanAllFields(final String table, final String startkey, final int recordcount, + final Vector<HashMap<String, ByteIterator>> result) { + final String scanQuery = "SELECT meta().id as id FROM `" + bucketName + "` WHERE meta().id >= '$1' LIMIT $2"; + Collection<HashMap<String, ByteIterator>> documents = bucket.async() + .query(N1qlQuery.parameterized( + scanQuery, + JsonArray.from(formatId(table, startkey), recordcount), + N1qlParams.build().adhoc(adhoc).maxParallelism(maxParallelism) + )) + .doOnNext(new Action1<AsyncN1qlQueryResult>() { + @Override + public void call(AsyncN1qlQueryResult result) { + if (!result.parseSuccess()) { + throw new RuntimeException("Error while parsing N1QL Result. Query: " + scanQuery + + ", Errors: " + result.errors()); + } + } + }) + .flatMap(new Func1<AsyncN1qlQueryResult, Observable<AsyncN1qlQueryRow>>() { + @Override + public Observable<AsyncN1qlQueryRow> call(AsyncN1qlQueryResult result) { + return result.rows(); + } + }) + .flatMap(new Func1<AsyncN1qlQueryRow, Observable<RawJsonDocument>>() { + @Override + public Observable<RawJsonDocument> call(AsyncN1qlQueryRow row) { + return bucket.async().get(row.value().getString("id"), RawJsonDocument.class); + } + }) + .map(new Func1<RawJsonDocument, HashMap<String, ByteIterator>>() { + @Override + public HashMap<String, ByteIterator> call(RawJsonDocument document) { + HashMap<String, ByteIterator> tuple = new HashMap<String, ByteIterator>(); + decode(document.content(), null, tuple); + return tuple; + } + }) + .toList() + .toBlocking() + .single(); + + result.addAll(documents); + return Status.OK; + } + + /** + * Performs the {@link #scan(String, String, int, Set, Vector)} operation N1Ql only for a subset of the fields. + * + * @param table The name of the table + * @param startkey The record key of the first record to read. + * @param recordcount The number of records to read + * @param fields The list of fields to read, or null for all of them + * @param result A Vector of HashMaps, where each HashMap is a set field/value pairs for one record + * @return The result of the operation. + */ + private Status scanSpecificFields(final String table, final String startkey, final int recordcount, + final Set<String> fields, final Vector<HashMap<String, ByteIterator>> result) { + String scanQuery = "SELECT " + joinFields(fields) + " FROM `" + bucketName + "` WHERE meta().id >= '$1' LIMIT $2"; + N1qlQueryResult queryResult = bucket.query(N1qlQuery.parameterized( + scanQuery, + JsonArray.from(formatId(table, startkey), recordcount), + N1qlParams.build().adhoc(adhoc).maxParallelism(maxParallelism) + )); + + if (!queryResult.parseSuccess() || !queryResult.finalSuccess()) { + throw new RuntimeException("Error while parsing N1QL Result. Query: " + scanQuery + + ", Errors: " + queryResult.errors()); + } + + boolean allFields = fields == null || fields.isEmpty(); + result.ensureCapacity(recordcount); + + for (N1qlQueryRow row : queryResult) { + JsonObject value = row.value(); + if (fields == null) { + value = value.getObject(bucketName); + } + Set<String> f = allFields ? value.getNames() : fields; + HashMap<String, ByteIterator> tuple = new HashMap<String, ByteIterator>(f.size()); + for (String field : f) { + tuple.put(field, new StringByteIterator(value.getString(field))); + } + result.add(tuple); + } + return Status.OK; + } + + /** + * Helper method to block on the response, depending on the property set. + * + * By default, since YCSB is sync the code will always wait for the operation to complete. In some + * cases it can be useful to just "drive load" and disable the waiting. Note that when the + * "-p couchbase.syncMutationResponse=false" option is used, the measured results by YCSB can basically + * be thrown away. Still helpful sometimes during load phases to speed them up :) + * + * @param input the async input observable. + */ + private void waitForMutationResponse(final Observable<? extends Document<?>> input) { + if (!syncMutResponse) { + input.subscribe(new Subscriber<Document<?>>() { + @Override + public void onCompleted() { + } + + @Override + public void onError(Throwable e) { + } + + @Override + public void onNext(Document<?> document) { + } + }); + } else { + Blocking.blockForSingle(input, kvTimeout, TimeUnit.MILLISECONDS); + } + } + + /** + * Helper method to turn the values into a String, used with {@link #upsertN1ql(String, HashMap)}. + * + * @param values the values to encode. + * @return the encoded string. + */ + private static String encodeN1qlFields(final HashMap<String, ByteIterator> values) { + if (values.isEmpty()) { + return ""; + } + + StringBuilder sb = new StringBuilder(); + for (Map.Entry<String, ByteIterator> entry : values.entrySet()) { + String raw = entry.getValue().toString(); + String escaped = raw.replace("\"", "\\\"").replace("\'", "\\\'"); + sb.append(entry.getKey()).append("=\"").append(escaped).append("\" "); + } + String toReturn = sb.toString(); + return toReturn.substring(0, toReturn.length() - 1); + } + + /** + * Helper method to turn the map of values into a {@link JsonObject} for further use. + * + * @param values the values to transform. + * @return the created json object. + */ + private static JsonObject valuesToJsonObject(final HashMap<String, ByteIterator> values) { + JsonObject result = JsonObject.create(); + for (Map.Entry<String, ByteIterator> entry : values.entrySet()) { + result.put(entry.getKey(), entry.getValue().toString()); + } + return result; + } + + /** + * Helper method to join the set of fields into a String suitable for N1QL. + * + * @param fields the fields to join. + * @return the joined fields as a String. + */ + private static String joinFields(final Set<String> fields) { + if (fields == null || fields.isEmpty()) { + return "*"; + } + StringBuilder builder = new StringBuilder(); + for (String f : fields) { + builder.append("`").append(f).append("`").append(","); + } + String toReturn = builder.toString(); + return toReturn.substring(0, toReturn.length() - 1); + } + + /** + * Helper method to turn the prefix and key into a proper document ID. + * + * @param prefix the prefix (table). + * @param key the key itself. + * @return a document ID that can be used with Couchbase. + */ + private static String formatId(final String prefix, final String key) { + return prefix + ":" + key; + } + + /** + * Helper method to parse the "ReplicateTo" property on startup. + * + * @param property the proeprty to parse. + * @return the parsed setting. + */ + private static ReplicateTo parseReplicateTo(final String property) throws DBException { + int value = Integer.parseInt(property); + + switch (value) { + case 0: + return ReplicateTo.NONE; + case 1: + return ReplicateTo.ONE; + case 2: + return ReplicateTo.TWO; + case 3: + return ReplicateTo.THREE; + default: + throw new DBException("\"couchbase.replicateTo\" must be between 0 and 3"); + } + } + + /** + * Helper method to parse the "PersistTo" property on startup. + * + * @param property the proeprty to parse. + * @return the parsed setting. + */ + private static PersistTo parsePersistTo(final String property) throws DBException { + int value = Integer.parseInt(property); + + switch (value) { + case 0: + return PersistTo.NONE; + case 1: + return PersistTo.ONE; + case 2: + return PersistTo.TWO; + case 3: + return PersistTo.THREE; + case 4: + return PersistTo.FOUR; + default: + throw new DBException("\"couchbase.persistTo\" must be between 0 and 4"); + } + } + + /** + * Decode the String from server and pass it into the decoded destination. + * + * @param source the loaded object. + * @param fields the fields to check. + * @param dest the result passed back to YCSB. + */ + private void decode(final String source, final Set<String> fields, + final HashMap<String, ByteIterator> dest) { + try { + JsonNode json = JacksonTransformers.MAPPER.readTree(source); + boolean checkFields = fields != null && !fields.isEmpty(); + for (Iterator<Map.Entry<String, JsonNode>> jsonFields = json.fields(); jsonFields.hasNext();) { + Map.Entry<String, JsonNode> jsonField = jsonFields.next(); + String name = jsonField.getKey(); + if (checkFields && fields.contains(name)) { + continue; + } + JsonNode jsonValue = jsonField.getValue(); + if (jsonValue != null && !jsonValue.isNull()) { + dest.put(name, new StringByteIterator(jsonValue.asText())); + } + } + } catch (Exception e) { + throw new RuntimeException("Could not decode JSON"); + } + } + + /** + * Encode the source into a String for storage. + * + * @param source the source value. + * @return the encoded string. + */ + private String encode(final HashMap<String, ByteIterator> source) { + HashMap<String, String> stringMap = StringByteIterator.getStringMap(source); + ObjectNode node = JacksonTransformers.MAPPER.createObjectNode(); + for (Map.Entry<String, String> pair : stringMap.entrySet()) { + node.put(pair.getKey(), pair.getValue()); + } + JsonFactory jsonFactory = new JsonFactory(); + Writer writer = new StringWriter(); + try { + JsonGenerator jsonGenerator = jsonFactory.createGenerator(writer); + JacksonTransformers.MAPPER.writeTree(jsonGenerator, node); + } catch (Exception e) { + throw new RuntimeException("Could not encode JSON value"); + } + return writer.toString(); + } +} + +/** + * Factory for the {@link BackoffSelectStrategy} to be used with boosting. + */ +class BackoffSelectStrategyFactory implements SelectStrategyFactory { + @Override + public SelectStrategy newSelectStrategy() { + return new BackoffSelectStrategy(); + } +} + +/** + * Custom IO select strategy which trades CPU for throughput, used with the boost setting. + */ +class BackoffSelectStrategy implements SelectStrategy { + + private int counter = 0; + + @Override + public int calculateStrategy(final IntSupplier supplier, final boolean hasTasks) throws Exception { + int selectNowResult = supplier.get(); + if (hasTasks || selectNowResult != 0) { + counter = 0; + return selectNowResult; + } + counter++; + + if (counter > 2000) { + LockSupport.parkNanos(1); + } else if (counter > 3000) { + Thread.yield(); + } else if (counter > 4000) { + LockSupport.parkNanos(1000); + } else if (counter > 5000) { + // defer to blocking select + counter = 0; + return SelectStrategy.SELECT; + } + + return SelectStrategy.CONTINUE; + } +} diff --git a/couchbase2/src/main/java/com/yahoo/ycsb/db/couchbase2/package-info.java b/couchbase2/src/main/java/com/yahoo/ycsb/db/couchbase2/package-info.java new file mode 100644 index 0000000000000000000000000000000000000000..0eb3b3992b30adf338223870471a1e75ed82ab3b --- /dev/null +++ b/couchbase2/src/main/java/com/yahoo/ycsb/db/couchbase2/package-info.java @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2015 - 2016 YCSB contributors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +/** + * The YCSB binding for <a href="http://www.couchbase.com/">Couchbase</a>, new driver. + */ +package com.yahoo.ycsb.db.couchbase2; + diff --git a/distribution/pom.xml b/distribution/pom.xml index e61e5b038a0cf3340a6a5148babf0f2f9a57359c..0027c615ac93b45e8dfa4eea969c62cb2b9090c2 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -59,6 +59,11 @@ LICENSE file. <artifactId>couchbase-binding</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>com.yahoo.ycsb</groupId> + <artifactId>couchbase2-binding</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>com.yahoo.ycsb</groupId> <artifactId>dynamodb-binding</artifactId> diff --git a/pom.xml b/pom.xml index d1f662fba9e9f46bb25af504d5ba93d878702b46..5f6e1509ffcdddc29adfc9667963d61dcec30d0e 100644 --- a/pom.xml +++ b/pom.xml @@ -90,6 +90,7 @@ LICENSE file. <thrift.version>0.8.0</thrift.version> <hypertable.version>0.9.5.6</hypertable.version> <couchbase.version>1.4.10</couchbase.version> + <couchbase2.version>2.2.6</couchbase2.version> <tarantool.version>1.6.5</tarantool.version> <aerospike.version>3.1.2</aerospike.version> <solr.version>5.4.0</solr.version> @@ -105,6 +106,7 @@ LICENSE file. <module>cassandra</module> <module>cassandra2</module> <module>couchbase</module> + <module>couchbase2</module> <module>distribution</module> <module>dynamodb</module> <module>elasticsearch</module>