From f1eed61e731173a55802ba379d8987acb11b19dd Mon Sep 17 00:00:00 2001
From: Jason Tedor <jason@tedor.me>
Date: Tue, 8 Aug 2017 19:08:29 +0900
Subject: [PATCH] Elasticsearch 5: Code cleanup

This commit is a straightforward code cleanup of the Elasticsearch 5
transport client and REST client implementations.
---
 .../db/elasticsearch5/Elasticsearch5.java     | 35 ++++++++++
 .../elasticsearch5/ElasticsearchClient.java   | 61 ++++++++---------
 .../ElasticsearchRestClient.java              | 68 ++++++++-----------
 3 files changed, 92 insertions(+), 72 deletions(-)
 create mode 100644 elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/Elasticsearch5.java

diff --git a/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/Elasticsearch5.java b/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/Elasticsearch5.java
new file mode 100644
index 00000000..a67e0447
--- /dev/null
+++ b/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/Elasticsearch5.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2017 YCSB contributors. All rights reserved.
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+package com.yahoo.ycsb.db.elasticsearch5;
+
+import java.util.Properties;
+
+final class Elasticsearch5 {
+
+  private Elasticsearch5() {
+
+  }
+
+  static final String KEY = "key";
+
+  static int parseIntegerProperty(final Properties properties, final String key, final int defaultValue) {
+    final String value = properties.getProperty(key);
+    return value == null ? defaultValue : Integer.parseInt(value);
+  }
+
+}
diff --git a/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/ElasticsearchClient.java b/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/ElasticsearchClient.java
index a34dc0a2..cf8100dc 100644
--- a/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/ElasticsearchClient.java
+++ b/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/ElasticsearchClient.java
@@ -26,7 +26,6 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.action.delete.DeleteResponse;
 import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.Client;
 import org.elasticsearch.client.Requests;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.common.settings.Settings;
@@ -48,6 +47,8 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.Vector;
 
+import static com.yahoo.ycsb.db.elasticsearch5.Elasticsearch5.KEY;
+import static com.yahoo.ycsb.db.elasticsearch5.Elasticsearch5.parseIntegerProperty;
 import static org.elasticsearch.common.settings.Settings.Builder;
 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 
@@ -61,7 +62,7 @@ public class ElasticsearchClient extends DB {
   private static final String DEFAULT_REMOTE_HOST = "localhost:9300";
   private static final int NUMBER_OF_SHARDS = 1;
   private static final int NUMBER_OF_REPLICAS = 0;
-  private Client client;
+  private TransportClient client;
   private String indexKey;
 
   /**
@@ -75,11 +76,11 @@ public class ElasticsearchClient extends DB {
 
     this.indexKey = props.getProperty("es.index.key", DEFAULT_INDEX_KEY);
 
-    int numberOfShards = parseIntegerProperty(props, "es.number_of_shards", NUMBER_OF_SHARDS);
-    int numberOfReplicas = parseIntegerProperty(props, "es.number_of_replicas", NUMBER_OF_REPLICAS);
+    final int numberOfShards = parseIntegerProperty(props, "es.number_of_shards", NUMBER_OF_SHARDS);
+    final int numberOfReplicas = parseIntegerProperty(props, "es.number_of_replicas", NUMBER_OF_REPLICAS);
 
-    Boolean newdb = Boolean.parseBoolean(props.getProperty("es.newdb", "false"));
-    Builder settings = Settings.builder().put("cluster.name", DEFAULT_CLUSTER_NAME);
+    final Boolean newdb = Boolean.parseBoolean(props.getProperty("es.newdb", "false"));
+    final Builder settings = Settings.builder().put("cluster.name", DEFAULT_CLUSTER_NAME);
 
     // if properties file contains elasticsearch user defined properties
     // add it to the settings file (will overwrite the defaults).
@@ -92,30 +93,33 @@ public class ElasticsearchClient extends DB {
       }
     }
     final String clusterName = settings.get("cluster.name");
-    System.err.println("Elasticsearch starting node = " + clusterName);
+    System.out.println("Elasticsearch cluster name = " + clusterName);
 
     settings.put("client.transport.sniff", true)
             .put("client.transport.ignore_cluster_name", false)
             .put("client.transport.ping_timeout", "30s")
             .put("client.transport.nodes_sampler_interval", "30s");
     // Default it to localhost:9300
-    String[] nodeList = props.getProperty("es.hosts.list", DEFAULT_REMOTE_HOST).split(",");
+    final String[] nodeList = props.getProperty("es.hosts.list", DEFAULT_REMOTE_HOST).split(",");
     System.out.println("Elasticsearch Remote Hosts = " + props.getProperty("es.hosts.list", DEFAULT_REMOTE_HOST));
-    TransportClient tClient = new PreBuiltTransportClient(settings.build());
+    client = new PreBuiltTransportClient(settings.build());
     for (String h : nodeList) {
       String[] nodes = h.split(":");
+
+      final InetAddress address;
       try {
-        tClient.addTransportAddress(new InetSocketTransportAddress(
-                InetAddress.getByName(nodes[0]),
-                Integer.parseInt(nodes[1])
-        ));
-      } catch (NumberFormatException e) {
-        throw new IllegalArgumentException("Unable to parse port number.", e);
+        address = InetAddress.getByName(nodes[0]);
       } catch (UnknownHostException e) {
-        throw new IllegalArgumentException("Unable to Identify host.", e);
+        throw new IllegalArgumentException("unable to identity host [" + nodes[0]+ "]", e);
+      }
+      final int port;
+      try {
+        port = Integer.parseInt(nodes[1]);
+      } catch (final NumberFormatException e) {
+        throw new IllegalArgumentException("unable to parse port [" + nodes[1] + "]", e);
       }
+      client.addTransportAddress(new InetSocketTransportAddress(address, port));
     }
-    client = tClient;
 
     final boolean exists =
         client.admin().indices()
@@ -136,11 +140,6 @@ public class ElasticsearchClient extends DB {
     client.admin().cluster().health(new ClusterHealthRequest().waitForGreenStatus()).actionGet();
   }
 
-  private int parseIntegerProperty(final Properties properties, final String key, final int defaultValue) {
-    final String value = properties.getProperty(key);
-    return value == null ? defaultValue : Integer.parseInt(value);
-  }
-
   @Override
   public void cleanup() throws DBException {
     if (client != null) {
@@ -158,8 +157,7 @@ public class ElasticsearchClient extends DB {
       for (final Entry<String, String> entry : StringByteIterator.getStringMap(values).entrySet()) {
         doc.field(entry.getKey(), entry.getValue());
       }
-
-      doc.field("key", key);
+      doc.field(KEY, key);
       doc.endObject();
 
       client.prepareIndex(indexKey, table).setSource(doc).execute().actionGet();
@@ -184,9 +182,9 @@ public class ElasticsearchClient extends DB {
       final DeleteResponse deleteResponse = client.prepareDelete(indexKey, table, id).execute().actionGet();
       if (deleteResponse.status().equals(RestStatus.NOT_FOUND)) {
         return Status.NOT_FOUND;
-      } else {
-        return Status.OK;
       }
+
+      return Status.OK;
     } catch (final Exception e) {
       e.printStackTrace();
       return Status.ERROR;
@@ -213,14 +211,14 @@ public class ElasticsearchClient extends DB {
         }
       } else {
         for (final Map.Entry<String, SearchHitField> e : hit.getFields().entrySet()) {
-          if ("key".equals(e.getKey())) {
+          if (KEY.equals(e.getKey())) {
             continue;
           }
           result.put(e.getKey(), new StringByteIterator((String) e.getValue().getValue()));
         }
       }
-      return Status.OK;
 
+      return Status.OK;
     } catch (final Exception e) {
       e.printStackTrace();
       return Status.ERROR;
@@ -243,7 +241,6 @@ public class ElasticsearchClient extends DB {
       client.prepareIndex(indexKey, table, hit.getId()).setSource(hit.getSource()).get();
 
       return Status.OK;
-
     } catch (final Exception e) {
       e.printStackTrace();
       return Status.ERROR;
@@ -258,7 +255,7 @@ public class ElasticsearchClient extends DB {
       final Set<String> fields,
       final Vector<HashMap<String, ByteIterator>> result) {
     try {
-      final RangeQueryBuilder query = new RangeQueryBuilder("key").gte(startkey);
+      final RangeQueryBuilder query = new RangeQueryBuilder(KEY).gte(startkey);
       final SearchResponse response = client.prepareSearch(indexKey).setQuery(query).setSize(recordcount).get();
 
       for (final SearchHit hit : response.getHits()) {
@@ -271,7 +268,7 @@ public class ElasticsearchClient extends DB {
         } else {
           entry = new HashMap<>(hit.getFields().size());
           for (final Map.Entry<String, SearchHitField> field : hit.getFields().entrySet()) {
-            if ("key".equals(field.getKey())) {
+            if (KEY.equals(field.getKey())) {
               continue;
             }
             entry.put(field.getKey(), new StringByteIterator((String) field.getValue().getValue()));
@@ -288,7 +285,7 @@ public class ElasticsearchClient extends DB {
 
 
   private SearchResponse search(final String table, final String key) {
-    return client.prepareSearch(indexKey).setTypes(table).setQuery(new TermQueryBuilder("key", key)).get();
+    return client.prepareSearch(indexKey).setTypes(table).setQuery(new TermQueryBuilder(KEY, key)).get();
   }
 
 }
diff --git a/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/ElasticsearchRestClient.java b/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/ElasticsearchRestClient.java
index d1c12d63..07952c10 100644
--- a/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/ElasticsearchRestClient.java
+++ b/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/ElasticsearchRestClient.java
@@ -32,7 +32,6 @@ import org.apache.http.nio.entity.NStringEntity;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.elasticsearch.client.Response;
 import org.elasticsearch.client.RestClient;
-import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 
 import java.io.IOException;
@@ -46,8 +45,9 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.Vector;
 
+import static com.yahoo.ycsb.db.elasticsearch5.Elasticsearch5.KEY;
+import static com.yahoo.ycsb.db.elasticsearch5.Elasticsearch5.parseIntegerProperty;
 import static java.util.Collections.emptyMap;
-import static org.elasticsearch.common.settings.Settings.Builder;
 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 
 /**
@@ -74,31 +74,15 @@ public class ElasticsearchRestClient extends DB {
 
     this.indexKey = props.getProperty("es.index.key", DEFAULT_INDEX_KEY);
 
-    int numberOfShards = Integer.valueOf(props.getProperty("es.number_of_shards",
-        String.valueOf(NUMBER_OF_SHARDS)));
-    int numberOfReplicas = Integer.valueOf(props.getProperty("es.number_of_replicas",
-        String.valueOf(NUMBER_OF_REPLICAS)));
-
-    Boolean newdb = Boolean.parseBoolean(props.getProperty("es.newdb", "false"));
-    Builder settings = Settings.builder().put("cluster.name", DEFAULT_CLUSTER_NAME);
-
-    // if properties file contains elasticsearch user defined properties
-    // add it to the settings file (will overwrite the defaults).
-    for (final Map.Entry<Object, Object> e : props.entrySet()) {
-      if (e.getKey() instanceof String) {
-        final String key = (String) e.getKey();
-        if (key.startsWith("es.setting.")) {
-          settings.put(key.substring("es.setting.".length()), e.getValue());
-        }
-      }
-    }
-    final String clusterName = settings.get("cluster.name");
-    System.err.println("Elasticsearch starting node = " + clusterName);
+    final int numberOfShards = parseIntegerProperty(props, "es.number_of_shards", NUMBER_OF_SHARDS);
+    final int numberOfReplicas = parseIntegerProperty(props, "es.number_of_replicas", NUMBER_OF_REPLICAS);
+
+    final Boolean newdb = Boolean.parseBoolean(props.getProperty("es.newdb", "false"));
 
-    String[] nodeList = props.getProperty("es.hosts.list", DEFAULT_REMOTE_HOST).split(",");
+    final String[] nodeList = props.getProperty("es.hosts.list", DEFAULT_REMOTE_HOST).split(",");
     System.out.println("Elasticsearch Remote Hosts = " + props.getProperty("es.hosts.list", DEFAULT_REMOTE_HOST));
 
-    List<HttpHost> esHttpHosts = new ArrayList<>(nodeList.length);
+    final List<HttpHost> esHttpHosts = new ArrayList<>(nodeList.length);
     for (String h : nodeList) {
       String[] nodes = h.split(":");
       esHttpHosts.add(new HttpHost(nodes[0], Integer.valueOf(nodes[1]), "http"));
@@ -204,7 +188,7 @@ public class ElasticsearchRestClient extends DB {
   public Status insert(final String table, final String key, final Map<String, ByteIterator> values) {
     try {
       final Map<String, String> data = StringByteIterator.getStringMap(values);
-      data.put("key", key);
+      data.put(KEY, key);
 
       final Response response = restClient.performRequest(
           "POST",
@@ -224,7 +208,7 @@ public class ElasticsearchRestClient extends DB {
   }
 
   @Override
-  public Status delete(String table, String key) {
+  public Status delete(final String table, final String key) {
     try {
       final Response searchResponse = search(table, key);
       final int statusCode = searchResponse.getStatusLine().getStatusCode();
@@ -240,7 +224,8 @@ public class ElasticsearchRestClient extends DB {
       if (total == 0) {
         return Status.NOT_FOUND;
       }
-      @SuppressWarnings("unchecked") final Map<String, Object> hit = (Map<String, Object>)((List<Object>)hits.get("hits")).get(0);
+      @SuppressWarnings("unchecked") final Map<String, Object> hit =
+              (Map<String, Object>)((List<Object>)hits.get("hits")).get(0);
       @SuppressWarnings("unchecked") final Map<String, Object> source = (Map<String, Object>)hit.get("_source");
       final Response deleteResponse =
               restClient.performRequest("DELETE", "/" + indexKey + "/" + table + "/" + source.get("_id"));
@@ -276,7 +261,8 @@ public class ElasticsearchRestClient extends DB {
       if (total == 0) {
         return Status.NOT_FOUND;
       }
-      @SuppressWarnings("unchecked") final Map<String, Object> hit = (Map<String, Object>)((List<Object>)hits.get("hits")).get(0);
+      @SuppressWarnings("unchecked") final Map<String, Object> hit =
+              (Map<String, Object>)((List<Object>)hits.get("hits")).get(0);
       @SuppressWarnings("unchecked") final Map<String, Object> source = (Map<String, Object>)hit.get("_source");
       if (fields != null) {
         for (final String field : fields) {
@@ -284,7 +270,7 @@ public class ElasticsearchRestClient extends DB {
         }
       } else {
         for (final Map.Entry<String, Object> e : source.entrySet()) {
-          if ("key".equals(e.getKey())) {
+          if (KEY.equals(e.getKey())) {
             continue;
           }
           result.put(e.getKey(), new StringByteIterator((String) e.getValue()));
@@ -299,7 +285,7 @@ public class ElasticsearchRestClient extends DB {
   }
 
   @Override
-  public Status update(String table, String key, Map<String, ByteIterator> values) {
+  public Status update(final String table, final String key, final Map<String, ByteIterator> values) {
     try {
       final Response searchResponse = search(table, key);
       final int statusCode = searchResponse.getStatusLine().getStatusCode();
@@ -315,7 +301,8 @@ public class ElasticsearchRestClient extends DB {
       if (total == 0) {
         return Status.NOT_FOUND;
       }
-      @SuppressWarnings("unchecked") final Map<String, Object> hit = (Map<String, Object>) ((List<Object>) hits.get("hits")).get(0);
+      @SuppressWarnings("unchecked") final Map<String, Object> hit =
+              (Map<String, Object>) ((List<Object>) hits.get("hits")).get(0);
       @SuppressWarnings("unchecked") final Map<String, Object> source = (Map<String, Object>) hit.get("_source");
       for (final Map.Entry<String, String> entry : StringByteIterator.getStringMap(values).entrySet()) {
         source.put(entry.getKey(), entry.getValue());
@@ -338,18 +325,18 @@ public class ElasticsearchRestClient extends DB {
 
   @Override
   public Status scan(
-      String table,
-      String startkey,
-      int recordcount,
-      Set<String> fields,
-      Vector<HashMap<String, ByteIterator>> result) {
+      final String table,
+      final String startkey,
+      final int recordcount,
+      final Set<String> fields,
+      final Vector<HashMap<String, ByteIterator>> result) {
     try {
       final Response response;
       try (XContentBuilder builder = jsonBuilder()) {
         builder.startObject();
         builder.startObject("query");
         builder.startObject("range");
-        builder.startObject("key");
+        builder.startObject(KEY);
         builder.field("gte", startkey);
         builder.endObject();
         builder.endObject();
@@ -359,7 +346,8 @@ public class ElasticsearchRestClient extends DB {
         response = search(table, builder);
         @SuppressWarnings("unchecked") final Map<String, Object> map = map(response);
         @SuppressWarnings("unchecked") final Map<String, Object> hits = (Map<String, Object>)map.get("hits");
-        @SuppressWarnings("unchecked") final List<Map<String, Object>> list = (List<Map<String, Object>>) hits.get("hits");
+        @SuppressWarnings("unchecked") final List<Map<String, Object>> list =
+                (List<Map<String, Object>>) hits.get("hits");
 
         for (final Map<String, Object> hit : list) {
           @SuppressWarnings("unchecked") final Map<String, Object> source = (Map<String, Object>)hit.get("_source");
@@ -372,7 +360,7 @@ public class ElasticsearchRestClient extends DB {
           } else {
             entry = new HashMap<>(hit.size());
             for (final Map.Entry<String, Object> field : source.entrySet()) {
-              if ("key".equals(field.getKey())) {
+              if (KEY.equals(field.getKey())) {
                 continue;
               }
               entry.put(field.getKey(), new StringByteIterator((String) field.getValue()));
@@ -393,7 +381,7 @@ public class ElasticsearchRestClient extends DB {
       builder.startObject();
       builder.startObject("query");
       builder.startObject("term");
-      builder.field("key", key);
+      builder.field(KEY, key);
       builder.endObject();
       builder.endObject();
       builder.endObject();
-- 
GitLab