From 73e2b18838887b44b4c8b8b7f68d7107c5d199d0 Mon Sep 17 00:00:00 2001
From: Michael Nitschinger <michael@nitschinger.at>
Date: Thu, 28 Apr 2016 17:40:46 +0200
Subject: [PATCH] [couchbase2] Refactor scan operation.

This changeset refactors the scan operation slightly, using a List
with a predefined size (since we know it), as well as using the
document ID directly for the response rather than doing full blown
JSON decoding which is not needed.
---
 .../ycsb/db/couchbase2/Couchbase2Client.java  | 43 ++++++++++++-------
 1 file changed, 28 insertions(+), 15 deletions(-)

diff --git a/couchbase2/src/main/java/com/yahoo/ycsb/db/couchbase2/Couchbase2Client.java b/couchbase2/src/main/java/com/yahoo/ycsb/db/couchbase2/Couchbase2Client.java
index e89a3dff..3d0bc039 100644
--- a/couchbase2/src/main/java/com/yahoo/ycsb/db/couchbase2/Couchbase2Client.java
+++ b/couchbase2/src/main/java/com/yahoo/ycsb/db/couchbase2/Couchbase2Client.java
@@ -102,6 +102,7 @@ public class Couchbase2Client extends DB {
     System.setProperty("com.couchbase.query.encodedPlanEnabled", "false");
   }
 
+  private static final String SEPARATOR = ":";
   private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance(Couchbase2Client.class);
   private static final Object INIT_COORDINATOR = new Object();
 
@@ -125,6 +126,7 @@ public class Couchbase2Client extends DB {
   private int boost;
   private int networkMetricsInterval;
   private int runtimeMetricsInterval;
+  private String scanAllQuery;
 
   @Override
   public void init() throws DBException {
@@ -147,6 +149,7 @@ public class Couchbase2Client extends DB {
     boost = Integer.parseInt(props.getProperty("couchbase.boost", "3"));
     networkMetricsInterval = Integer.parseInt(props.getProperty("couchbase.networkMetricsInterval", "0"));
     runtimeMetricsInterval = Integer.parseInt(props.getProperty("couchbase.runtimeMetricsInterval", "0"));
+    scanAllQuery =  "SELECT meta().id as id FROM `" + bucketName + "` WHERE meta().id >= '$1' LIMIT $2";
 
     try {
       synchronized (INIT_COORDINATOR) {
@@ -603,18 +606,19 @@ public class Couchbase2Client extends DB {
    */
   private Status scanAllFields(final String table, final String startkey, final int recordcount,
       final Vector<HashMap<String, ByteIterator>> result) {
-    final String scanQuery = "SELECT meta().id as id FROM `" + bucketName + "` WHERE meta().id >= '$1' LIMIT $2";
-    Collection<HashMap<String, ByteIterator>> documents = bucket.async()
+    final List<HashMap<String, ByteIterator>> data = new ArrayList<HashMap<String, ByteIterator>>(recordcount);
+
+    bucket.async()
         .query(N1qlQuery.parameterized(
-        scanQuery,
-        JsonArray.from(formatId(table, startkey), recordcount),
-        N1qlParams.build().adhoc(adhoc).maxParallelism(maxParallelism)
-      ))
+          scanAllQuery,
+          JsonArray.from(formatId(table, startkey), recordcount),
+          N1qlParams.build().adhoc(adhoc).maxParallelism(maxParallelism)
+        ))
         .doOnNext(new Action1<AsyncN1qlQueryResult>() {
           @Override
           public void call(AsyncN1qlQueryResult result) {
             if (!result.parseSuccess()) {
-              throw new RuntimeException("Error while parsing N1QL Result. Query: " + scanQuery
+              throw new RuntimeException("Error while parsing N1QL Result. Query: " + scanAllQuery
                 + ", Errors: " + result.errors());
             }
           }
@@ -628,7 +632,11 @@ public class Couchbase2Client extends DB {
         .flatMap(new Func1<AsyncN1qlQueryRow, Observable<RawJsonDocument>>() {
           @Override
           public Observable<RawJsonDocument> call(AsyncN1qlQueryRow row) {
-            return bucket.async().get(row.value().getString("id"), RawJsonDocument.class);
+            String id = new String(row.byteValue());
+            return bucket.async().get(
+              id.substring(id.indexOf(table + SEPARATOR), id.lastIndexOf('"')),
+              RawJsonDocument.class
+            );
           }
         })
         .map(new Func1<RawJsonDocument, HashMap<String, ByteIterator>>() {
@@ -639,11 +647,15 @@ public class Couchbase2Client extends DB {
             return tuple;
           }
         })
-        .toList()
         .toBlocking()
-        .single();
+        .forEach(new Action1<HashMap<String, ByteIterator>>() {
+          @Override
+          public void call(HashMap<String, ByteIterator> tuple) {
+            data.add(tuple);
+          }
+        });
 
-    result.addAll(documents);
+    result.addAll(data);
     return Status.OK;
   }
 
@@ -659,15 +671,16 @@ public class Couchbase2Client extends DB {
    */
   private Status scanSpecificFields(final String table, final String startkey, final int recordcount,
       final Set<String> fields, final Vector<HashMap<String, ByteIterator>> result) {
-    String scanQuery = "SELECT " + joinFields(fields) + " FROM `" + bucketName + "` WHERE meta().id >= '$1' LIMIT $2";
+    String scanSpecQuery = "SELECT " + joinFields(fields) + " FROM `" + bucketName
+        + "` WHERE meta().id >= '$1' LIMIT $2";
     N1qlQueryResult queryResult = bucket.query(N1qlQuery.parameterized(
-        scanQuery,
+        scanSpecQuery,
         JsonArray.from(formatId(table, startkey), recordcount),
         N1qlParams.build().adhoc(adhoc).maxParallelism(maxParallelism)
     ));
 
     if (!queryResult.parseSuccess() || !queryResult.finalSuccess()) {
-      throw new RuntimeException("Error while parsing N1QL Result. Query: " + scanQuery
+      throw new RuntimeException("Error while parsing N1QL Result. Query: " + scanSpecQuery
         + ", Errors: " + queryResult.errors());
     }
 
@@ -780,7 +793,7 @@ public class Couchbase2Client extends DB {
    * @return a document ID that can be used with Couchbase.
    */
   private static String formatId(final String prefix, final String key) {
-    return prefix + ":" + key;
+    return prefix + SEPARATOR + key;
   }
 
   /**
-- 
GitLab