diff --git a/couchbase2/src/main/java/com/yahoo/ycsb/db/couchbase2/Couchbase2Client.java b/couchbase2/src/main/java/com/yahoo/ycsb/db/couchbase2/Couchbase2Client.java index cdd57d8462c2bc6dbdaf32188c38a7bfa2749b76..3d0bc0398c76931539859af551c42b6379a664cb 100644 --- a/couchbase2/src/main/java/com/yahoo/ycsb/db/couchbase2/Couchbase2Client.java +++ b/couchbase2/src/main/java/com/yahoo/ycsb/db/couchbase2/Couchbase2Client.java @@ -102,6 +102,7 @@ public class Couchbase2Client extends DB { System.setProperty("com.couchbase.query.encodedPlanEnabled", "false"); } + private static final String SEPARATOR = ":"; private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance(Couchbase2Client.class); private static final Object INIT_COORDINATOR = new Object(); @@ -125,6 +126,7 @@ public class Couchbase2Client extends DB { private int boost; private int networkMetricsInterval; private int runtimeMetricsInterval; + private String scanAllQuery; @Override public void init() throws DBException { @@ -142,11 +144,12 @@ public class Couchbase2Client extends DB { kv = props.getProperty("couchbase.kv", "true").equals("true"); maxParallelism = Integer.parseInt(props.getProperty("couchbase.maxParallelism", "1")); kvEndpoints = Integer.parseInt(props.getProperty("couchbase.kvEndpoints", "1")); - queryEndpoints = Integer.parseInt(props.getProperty("couchbase.queryEndpoints", "5")); + queryEndpoints = Integer.parseInt(props.getProperty("couchbase.queryEndpoints", "1")); epoll = props.getProperty("couchbase.epoll", "false").equals("true"); boost = Integer.parseInt(props.getProperty("couchbase.boost", "3")); networkMetricsInterval = Integer.parseInt(props.getProperty("couchbase.networkMetricsInterval", "0")); runtimeMetricsInterval = Integer.parseInt(props.getProperty("couchbase.runtimeMetricsInterval", "0")); + scanAllQuery = "SELECT meta().id as id FROM `" + bucketName + "` WHERE meta().id >= '$1' LIMIT $2"; try { synchronized (INIT_COORDINATOR) { @@ -170,6 +173,9 @@ public class Couchbase2Client extends DB { .callbacksOnIoPool(true) .runtimeMetricsCollectorConfig(runtimeConfig) .networkLatencyMetricsCollectorConfig(latencyConfig) + .socketConnectTimeout(10000) // 10 secs socket connect timeout + .connectTimeout(30000) // 30 secs overall bucket open timeout + .kvTimeout(10000) // 10 instead of 2.5s for KV ops .kvEndpoints(kvEndpoints); // Tune boosting and epoll based on settings @@ -600,18 +606,19 @@ public class Couchbase2Client extends DB { */ private Status scanAllFields(final String table, final String startkey, final int recordcount, final Vector<HashMap<String, ByteIterator>> result) { - final String scanQuery = "SELECT meta().id as id FROM `" + bucketName + "` WHERE meta().id >= '$1' LIMIT $2"; - Collection<HashMap<String, ByteIterator>> documents = bucket.async() + final List<HashMap<String, ByteIterator>> data = new ArrayList<HashMap<String, ByteIterator>>(recordcount); + + bucket.async() .query(N1qlQuery.parameterized( - scanQuery, - JsonArray.from(formatId(table, startkey), recordcount), - N1qlParams.build().adhoc(adhoc).maxParallelism(maxParallelism) - )) + scanAllQuery, + JsonArray.from(formatId(table, startkey), recordcount), + N1qlParams.build().adhoc(adhoc).maxParallelism(maxParallelism) + )) .doOnNext(new Action1<AsyncN1qlQueryResult>() { @Override public void call(AsyncN1qlQueryResult result) { if (!result.parseSuccess()) { - throw new RuntimeException("Error while parsing N1QL Result. Query: " + scanQuery + throw new RuntimeException("Error while parsing N1QL Result. Query: " + scanAllQuery + ", Errors: " + result.errors()); } } @@ -625,7 +632,11 @@ public class Couchbase2Client extends DB { .flatMap(new Func1<AsyncN1qlQueryRow, Observable<RawJsonDocument>>() { @Override public Observable<RawJsonDocument> call(AsyncN1qlQueryRow row) { - return bucket.async().get(row.value().getString("id"), RawJsonDocument.class); + String id = new String(row.byteValue()); + return bucket.async().get( + id.substring(id.indexOf(table + SEPARATOR), id.lastIndexOf('"')), + RawJsonDocument.class + ); } }) .map(new Func1<RawJsonDocument, HashMap<String, ByteIterator>>() { @@ -636,11 +647,15 @@ public class Couchbase2Client extends DB { return tuple; } }) - .toList() .toBlocking() - .single(); + .forEach(new Action1<HashMap<String, ByteIterator>>() { + @Override + public void call(HashMap<String, ByteIterator> tuple) { + data.add(tuple); + } + }); - result.addAll(documents); + result.addAll(data); return Status.OK; } @@ -656,15 +671,16 @@ public class Couchbase2Client extends DB { */ private Status scanSpecificFields(final String table, final String startkey, final int recordcount, final Set<String> fields, final Vector<HashMap<String, ByteIterator>> result) { - String scanQuery = "SELECT " + joinFields(fields) + " FROM `" + bucketName + "` WHERE meta().id >= '$1' LIMIT $2"; + String scanSpecQuery = "SELECT " + joinFields(fields) + " FROM `" + bucketName + + "` WHERE meta().id >= '$1' LIMIT $2"; N1qlQueryResult queryResult = bucket.query(N1qlQuery.parameterized( - scanQuery, + scanSpecQuery, JsonArray.from(formatId(table, startkey), recordcount), N1qlParams.build().adhoc(adhoc).maxParallelism(maxParallelism) )); if (!queryResult.parseSuccess() || !queryResult.finalSuccess()) { - throw new RuntimeException("Error while parsing N1QL Result. Query: " + scanQuery + throw new RuntimeException("Error while parsing N1QL Result. Query: " + scanSpecQuery + ", Errors: " + queryResult.errors()); } @@ -777,7 +793,7 @@ public class Couchbase2Client extends DB { * @return a document ID that can be used with Couchbase. */ private static String formatId(final String prefix, final String key) { - return prefix + ":" + key; + return prefix + SEPARATOR + key; } /**