diff --git a/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/Elasticsearch5.java b/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/Elasticsearch5.java new file mode 100644 index 0000000000000000000000000000000000000000..a67e0447be9ddf42b3c78011415cf6317eaf780b --- /dev/null +++ b/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/Elasticsearch5.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2017 YCSB contributors. All rights reserved. + * <p> + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +package com.yahoo.ycsb.db.elasticsearch5; + +import java.util.Properties; + +final class Elasticsearch5 { + + private Elasticsearch5() { + + } + + static final String KEY = "key"; + + static int parseIntegerProperty(final Properties properties, final String key, final int defaultValue) { + final String value = properties.getProperty(key); + return value == null ? defaultValue : Integer.parseInt(value); + } + +} diff --git a/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/ElasticsearchClient.java b/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/ElasticsearchClient.java index a34dc0a26a1d57b4f84ba9a8c53929782c2a572e..cf8100dc84e73ed025afcaa92ab6b5d857614cc5 100644 --- a/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/ElasticsearchClient.java +++ b/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/ElasticsearchClient.java @@ -26,7 +26,6 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; @@ -48,6 +47,8 @@ import java.util.Properties; import java.util.Set; import java.util.Vector; +import static com.yahoo.ycsb.db.elasticsearch5.Elasticsearch5.KEY; +import static com.yahoo.ycsb.db.elasticsearch5.Elasticsearch5.parseIntegerProperty; import static org.elasticsearch.common.settings.Settings.Builder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -61,7 +62,7 @@ public class ElasticsearchClient extends DB { private static final String DEFAULT_REMOTE_HOST = "localhost:9300"; private static final int NUMBER_OF_SHARDS = 1; private static final int NUMBER_OF_REPLICAS = 0; - private Client client; + private TransportClient client; private String indexKey; /** @@ -75,11 +76,11 @@ public class ElasticsearchClient extends DB { this.indexKey = props.getProperty("es.index.key", DEFAULT_INDEX_KEY); - int numberOfShards = parseIntegerProperty(props, "es.number_of_shards", NUMBER_OF_SHARDS); - int numberOfReplicas = parseIntegerProperty(props, "es.number_of_replicas", NUMBER_OF_REPLICAS); + final int numberOfShards = parseIntegerProperty(props, "es.number_of_shards", NUMBER_OF_SHARDS); + final int numberOfReplicas = parseIntegerProperty(props, "es.number_of_replicas", NUMBER_OF_REPLICAS); - Boolean newdb = Boolean.parseBoolean(props.getProperty("es.newdb", "false")); - Builder settings = Settings.builder().put("cluster.name", DEFAULT_CLUSTER_NAME); + final Boolean newdb = Boolean.parseBoolean(props.getProperty("es.newdb", "false")); + final Builder settings = Settings.builder().put("cluster.name", DEFAULT_CLUSTER_NAME); // if properties file contains elasticsearch user defined properties // add it to the settings file (will overwrite the defaults). @@ -92,30 +93,33 @@ public class ElasticsearchClient extends DB { } } final String clusterName = settings.get("cluster.name"); - System.err.println("Elasticsearch starting node = " + clusterName); + System.out.println("Elasticsearch cluster name = " + clusterName); settings.put("client.transport.sniff", true) .put("client.transport.ignore_cluster_name", false) .put("client.transport.ping_timeout", "30s") .put("client.transport.nodes_sampler_interval", "30s"); // Default it to localhost:9300 - String[] nodeList = props.getProperty("es.hosts.list", DEFAULT_REMOTE_HOST).split(","); + final String[] nodeList = props.getProperty("es.hosts.list", DEFAULT_REMOTE_HOST).split(","); System.out.println("Elasticsearch Remote Hosts = " + props.getProperty("es.hosts.list", DEFAULT_REMOTE_HOST)); - TransportClient tClient = new PreBuiltTransportClient(settings.build()); + client = new PreBuiltTransportClient(settings.build()); for (String h : nodeList) { String[] nodes = h.split(":"); + + final InetAddress address; try { - tClient.addTransportAddress(new InetSocketTransportAddress( - InetAddress.getByName(nodes[0]), - Integer.parseInt(nodes[1]) - )); - } catch (NumberFormatException e) { - throw new IllegalArgumentException("Unable to parse port number.", e); + address = InetAddress.getByName(nodes[0]); } catch (UnknownHostException e) { - throw new IllegalArgumentException("Unable to Identify host.", e); + throw new IllegalArgumentException("unable to identity host [" + nodes[0]+ "]", e); + } + final int port; + try { + port = Integer.parseInt(nodes[1]); + } catch (final NumberFormatException e) { + throw new IllegalArgumentException("unable to parse port [" + nodes[1] + "]", e); } + client.addTransportAddress(new InetSocketTransportAddress(address, port)); } - client = tClient; final boolean exists = client.admin().indices() @@ -136,11 +140,6 @@ public class ElasticsearchClient extends DB { client.admin().cluster().health(new ClusterHealthRequest().waitForGreenStatus()).actionGet(); } - private int parseIntegerProperty(final Properties properties, final String key, final int defaultValue) { - final String value = properties.getProperty(key); - return value == null ? defaultValue : Integer.parseInt(value); - } - @Override public void cleanup() throws DBException { if (client != null) { @@ -158,8 +157,7 @@ public class ElasticsearchClient extends DB { for (final Entry<String, String> entry : StringByteIterator.getStringMap(values).entrySet()) { doc.field(entry.getKey(), entry.getValue()); } - - doc.field("key", key); + doc.field(KEY, key); doc.endObject(); client.prepareIndex(indexKey, table).setSource(doc).execute().actionGet(); @@ -184,9 +182,9 @@ public class ElasticsearchClient extends DB { final DeleteResponse deleteResponse = client.prepareDelete(indexKey, table, id).execute().actionGet(); if (deleteResponse.status().equals(RestStatus.NOT_FOUND)) { return Status.NOT_FOUND; - } else { - return Status.OK; } + + return Status.OK; } catch (final Exception e) { e.printStackTrace(); return Status.ERROR; @@ -213,14 +211,14 @@ public class ElasticsearchClient extends DB { } } else { for (final Map.Entry<String, SearchHitField> e : hit.getFields().entrySet()) { - if ("key".equals(e.getKey())) { + if (KEY.equals(e.getKey())) { continue; } result.put(e.getKey(), new StringByteIterator((String) e.getValue().getValue())); } } - return Status.OK; + return Status.OK; } catch (final Exception e) { e.printStackTrace(); return Status.ERROR; @@ -243,7 +241,6 @@ public class ElasticsearchClient extends DB { client.prepareIndex(indexKey, table, hit.getId()).setSource(hit.getSource()).get(); return Status.OK; - } catch (final Exception e) { e.printStackTrace(); return Status.ERROR; @@ -258,7 +255,7 @@ public class ElasticsearchClient extends DB { final Set<String> fields, final Vector<HashMap<String, ByteIterator>> result) { try { - final RangeQueryBuilder query = new RangeQueryBuilder("key").gte(startkey); + final RangeQueryBuilder query = new RangeQueryBuilder(KEY).gte(startkey); final SearchResponse response = client.prepareSearch(indexKey).setQuery(query).setSize(recordcount).get(); for (final SearchHit hit : response.getHits()) { @@ -271,7 +268,7 @@ public class ElasticsearchClient extends DB { } else { entry = new HashMap<>(hit.getFields().size()); for (final Map.Entry<String, SearchHitField> field : hit.getFields().entrySet()) { - if ("key".equals(field.getKey())) { + if (KEY.equals(field.getKey())) { continue; } entry.put(field.getKey(), new StringByteIterator((String) field.getValue().getValue())); @@ -288,7 +285,7 @@ public class ElasticsearchClient extends DB { private SearchResponse search(final String table, final String key) { - return client.prepareSearch(indexKey).setTypes(table).setQuery(new TermQueryBuilder("key", key)).get(); + return client.prepareSearch(indexKey).setTypes(table).setQuery(new TermQueryBuilder(KEY, key)).get(); } } diff --git a/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/ElasticsearchRestClient.java b/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/ElasticsearchRestClient.java index d1c12d633587cf99e07492e9d05bbcbe43e0bc2e..07952c1020fe786d185eba3e457734ae97a9f7bb 100644 --- a/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/ElasticsearchRestClient.java +++ b/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/ElasticsearchRestClient.java @@ -32,7 +32,6 @@ import org.apache.http.nio.entity.NStringEntity; import org.codehaus.jackson.map.ObjectMapper; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import java.io.IOException; @@ -46,8 +45,9 @@ import java.util.Properties; import java.util.Set; import java.util.Vector; +import static com.yahoo.ycsb.db.elasticsearch5.Elasticsearch5.KEY; +import static com.yahoo.ycsb.db.elasticsearch5.Elasticsearch5.parseIntegerProperty; import static java.util.Collections.emptyMap; -import static org.elasticsearch.common.settings.Settings.Builder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; /** @@ -74,31 +74,15 @@ public class ElasticsearchRestClient extends DB { this.indexKey = props.getProperty("es.index.key", DEFAULT_INDEX_KEY); - int numberOfShards = Integer.valueOf(props.getProperty("es.number_of_shards", - String.valueOf(NUMBER_OF_SHARDS))); - int numberOfReplicas = Integer.valueOf(props.getProperty("es.number_of_replicas", - String.valueOf(NUMBER_OF_REPLICAS))); - - Boolean newdb = Boolean.parseBoolean(props.getProperty("es.newdb", "false")); - Builder settings = Settings.builder().put("cluster.name", DEFAULT_CLUSTER_NAME); - - // if properties file contains elasticsearch user defined properties - // add it to the settings file (will overwrite the defaults). - for (final Map.Entry<Object, Object> e : props.entrySet()) { - if (e.getKey() instanceof String) { - final String key = (String) e.getKey(); - if (key.startsWith("es.setting.")) { - settings.put(key.substring("es.setting.".length()), e.getValue()); - } - } - } - final String clusterName = settings.get("cluster.name"); - System.err.println("Elasticsearch starting node = " + clusterName); + final int numberOfShards = parseIntegerProperty(props, "es.number_of_shards", NUMBER_OF_SHARDS); + final int numberOfReplicas = parseIntegerProperty(props, "es.number_of_replicas", NUMBER_OF_REPLICAS); + + final Boolean newdb = Boolean.parseBoolean(props.getProperty("es.newdb", "false")); - String[] nodeList = props.getProperty("es.hosts.list", DEFAULT_REMOTE_HOST).split(","); + final String[] nodeList = props.getProperty("es.hosts.list", DEFAULT_REMOTE_HOST).split(","); System.out.println("Elasticsearch Remote Hosts = " + props.getProperty("es.hosts.list", DEFAULT_REMOTE_HOST)); - List<HttpHost> esHttpHosts = new ArrayList<>(nodeList.length); + final List<HttpHost> esHttpHosts = new ArrayList<>(nodeList.length); for (String h : nodeList) { String[] nodes = h.split(":"); esHttpHosts.add(new HttpHost(nodes[0], Integer.valueOf(nodes[1]), "http")); @@ -204,7 +188,7 @@ public class ElasticsearchRestClient extends DB { public Status insert(final String table, final String key, final Map<String, ByteIterator> values) { try { final Map<String, String> data = StringByteIterator.getStringMap(values); - data.put("key", key); + data.put(KEY, key); final Response response = restClient.performRequest( "POST", @@ -224,7 +208,7 @@ public class ElasticsearchRestClient extends DB { } @Override - public Status delete(String table, String key) { + public Status delete(final String table, final String key) { try { final Response searchResponse = search(table, key); final int statusCode = searchResponse.getStatusLine().getStatusCode(); @@ -240,7 +224,8 @@ public class ElasticsearchRestClient extends DB { if (total == 0) { return Status.NOT_FOUND; } - @SuppressWarnings("unchecked") final Map<String, Object> hit = (Map<String, Object>)((List<Object>)hits.get("hits")).get(0); + @SuppressWarnings("unchecked") final Map<String, Object> hit = + (Map<String, Object>)((List<Object>)hits.get("hits")).get(0); @SuppressWarnings("unchecked") final Map<String, Object> source = (Map<String, Object>)hit.get("_source"); final Response deleteResponse = restClient.performRequest("DELETE", "/" + indexKey + "/" + table + "/" + source.get("_id")); @@ -276,7 +261,8 @@ public class ElasticsearchRestClient extends DB { if (total == 0) { return Status.NOT_FOUND; } - @SuppressWarnings("unchecked") final Map<String, Object> hit = (Map<String, Object>)((List<Object>)hits.get("hits")).get(0); + @SuppressWarnings("unchecked") final Map<String, Object> hit = + (Map<String, Object>)((List<Object>)hits.get("hits")).get(0); @SuppressWarnings("unchecked") final Map<String, Object> source = (Map<String, Object>)hit.get("_source"); if (fields != null) { for (final String field : fields) { @@ -284,7 +270,7 @@ public class ElasticsearchRestClient extends DB { } } else { for (final Map.Entry<String, Object> e : source.entrySet()) { - if ("key".equals(e.getKey())) { + if (KEY.equals(e.getKey())) { continue; } result.put(e.getKey(), new StringByteIterator((String) e.getValue())); @@ -299,7 +285,7 @@ public class ElasticsearchRestClient extends DB { } @Override - public Status update(String table, String key, Map<String, ByteIterator> values) { + public Status update(final String table, final String key, final Map<String, ByteIterator> values) { try { final Response searchResponse = search(table, key); final int statusCode = searchResponse.getStatusLine().getStatusCode(); @@ -315,7 +301,8 @@ public class ElasticsearchRestClient extends DB { if (total == 0) { return Status.NOT_FOUND; } - @SuppressWarnings("unchecked") final Map<String, Object> hit = (Map<String, Object>) ((List<Object>) hits.get("hits")).get(0); + @SuppressWarnings("unchecked") final Map<String, Object> hit = + (Map<String, Object>) ((List<Object>) hits.get("hits")).get(0); @SuppressWarnings("unchecked") final Map<String, Object> source = (Map<String, Object>) hit.get("_source"); for (final Map.Entry<String, String> entry : StringByteIterator.getStringMap(values).entrySet()) { source.put(entry.getKey(), entry.getValue()); @@ -338,18 +325,18 @@ public class ElasticsearchRestClient extends DB { @Override public Status scan( - String table, - String startkey, - int recordcount, - Set<String> fields, - Vector<HashMap<String, ByteIterator>> result) { + final String table, + final String startkey, + final int recordcount, + final Set<String> fields, + final Vector<HashMap<String, ByteIterator>> result) { try { final Response response; try (XContentBuilder builder = jsonBuilder()) { builder.startObject(); builder.startObject("query"); builder.startObject("range"); - builder.startObject("key"); + builder.startObject(KEY); builder.field("gte", startkey); builder.endObject(); builder.endObject(); @@ -359,7 +346,8 @@ public class ElasticsearchRestClient extends DB { response = search(table, builder); @SuppressWarnings("unchecked") final Map<String, Object> map = map(response); @SuppressWarnings("unchecked") final Map<String, Object> hits = (Map<String, Object>)map.get("hits"); - @SuppressWarnings("unchecked") final List<Map<String, Object>> list = (List<Map<String, Object>>) hits.get("hits"); + @SuppressWarnings("unchecked") final List<Map<String, Object>> list = + (List<Map<String, Object>>) hits.get("hits"); for (final Map<String, Object> hit : list) { @SuppressWarnings("unchecked") final Map<String, Object> source = (Map<String, Object>)hit.get("_source"); @@ -372,7 +360,7 @@ public class ElasticsearchRestClient extends DB { } else { entry = new HashMap<>(hit.size()); for (final Map.Entry<String, Object> field : source.entrySet()) { - if ("key".equals(field.getKey())) { + if (KEY.equals(field.getKey())) { continue; } entry.put(field.getKey(), new StringByteIterator((String) field.getValue())); @@ -393,7 +381,7 @@ public class ElasticsearchRestClient extends DB { builder.startObject(); builder.startObject("query"); builder.startObject("term"); - builder.field("key", key); + builder.field(KEY, key); builder.endObject(); builder.endObject(); builder.endObject();