diff --git a/bin/bindings.properties b/bin/bindings.properties index 3121e83bf6f9bc8ead0417a69e8a693d8e9b79bc..b37973905920a0be90734c5d1bc57c497d8f9b34 100644 --- a/bin/bindings.properties +++ b/bin/bindings.properties @@ -41,6 +41,7 @@ couchbase2:com.yahoo.ycsb.db.couchbase2.Couchbase2Client dynamodb:com.yahoo.ycsb.db.DynamoDBClient elasticsearch:com.yahoo.ycsb.db.ElasticsearchClient elasticsearch5:com.yahoo.ycsb.db.elasticsearch5.ElasticsearchClient +elasticsearch5-rest:com.yahoo.ycsb.db.elasticsearch5.ElasticsearchRestClient geode:com.yahoo.ycsb.db.GeodeClient googlebigtable:com.yahoo.ycsb.db.GoogleBigtableClient googledatastore:com.yahoo.ycsb.db.GoogleDatastoreClient diff --git a/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/ElasticsearchRestClient.java b/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/ElasticsearchRestClient.java index 2ecdb40c0290137fa07d40718c317af2294ea21f..d1c12d633587cf99e07492e9d05bbcbe43e0bc2e 100644 --- a/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/ElasticsearchRestClient.java +++ b/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/ElasticsearchRestClient.java @@ -1,4 +1,4 @@ -/** +/* * Copyright (c) 2017 YCSB contributors. All rights reserved. * <p> * Licensed under the Apache License, Version 2.0 (the "License"); you @@ -17,22 +17,38 @@ package com.yahoo.ycsb.db.elasticsearch5; -import com.yahoo.ycsb.*; +import com.yahoo.ycsb.ByteIterator; +import com.yahoo.ycsb.DB; +import com.yahoo.ycsb.DBException; +import com.yahoo.ycsb.Status; +import com.yahoo.ycsb.StringByteIterator; +import org.apache.http.Header; +import org.apache.http.HttpEntity; import org.apache.http.HttpHost; -import org.apache.http.client.methods.HttpDelete; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.client.methods.HttpPut; import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.apache.http.message.BasicHeader; import org.apache.http.nio.entity.NStringEntity; import org.codehaus.jackson.map.ObjectMapper; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; import java.io.IOException; -import java.util.*; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.Vector; +import static java.util.Collections.emptyMap; import static org.elasticsearch.common.settings.Settings.Builder; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; /** * Elasticsearch REST client for YCSB framework. @@ -44,9 +60,14 @@ public class ElasticsearchRestClient extends DB { private static final String DEFAULT_REMOTE_HOST = "localhost:9200"; private static final int NUMBER_OF_SHARDS = 1; private static final int NUMBER_OF_REPLICAS = 0; - private String indexKey; private RestClient restClient; - + private String indexKey; + + /** + * + * Initialize any state for this DB. Called once per DB instance; there is one + * DB instance per client thread. + */ @Override public void init() throws DBException { final Properties props = getProperties(); @@ -63,7 +84,14 @@ public class ElasticsearchRestClient extends DB { // if properties file contains elasticsearch user defined properties // add it to the settings file (will overwrite the defaults). - settings.put(props); + for (final Map.Entry<Object, Object> e : props.entrySet()) { + if (e.getKey() instanceof String) { + final String key = (String) e.getKey(); + if (key.startsWith("es.setting.")) { + settings.put(key.substring("es.setting.".length()), e.getValue()); + } + } + } final String clusterName = settings.get("cluster.name"); System.err.println("Elasticsearch starting node = " + clusterName); @@ -78,23 +106,86 @@ public class ElasticsearchRestClient extends DB { restClient = RestClient.builder(esHttpHosts.toArray(new HttpHost[esHttpHosts.size()])).build(); -// final boolean exists = -// client.admin().indices() -// .exists(Requests.indicesExistsRequest(indexKey)).actionGet() -// .isExists(); -// if (exists && newdb) { -// client.admin().indices().prepareDelete(indexKey).execute().actionGet(); -// } -// if (!exists || newdb) { -// client.admin().indices().create( -// new CreateIndexRequest(indexKey) -// .settings( -// Settings.builder() -// .put("index.number_of_shards", numberOfShards) -// .put("index.number_of_replicas", numberOfReplicas) -// )).actionGet(); -// } -// client.admin().cluster().health(new ClusterHealthRequest().waitForGreenStatus()).actionGet(); + final Response existsResponse = performRequest(restClient, "HEAD", "/" + indexKey); + final boolean exists = existsResponse.getStatusLine().getStatusCode() == 200; + + if (exists && newdb) { + final Response deleteResponse = performRequest(restClient, "DELETE", "/" + indexKey); + final int statusCode = deleteResponse.getStatusLine().getStatusCode(); + if (statusCode != 200) { + throw new DBException("delete [" + indexKey + "] failed with status [" + statusCode + "]"); + } + } + + if (!exists || newdb) { + try (XContentBuilder builder = jsonBuilder()) { + builder.startObject(); + builder.startObject("settings"); + builder.field("index.number_of_shards", numberOfShards); + builder.field("index.number_of_replicas", numberOfReplicas); + builder.endObject(); + builder.endObject(); + final Map<String, String> params = emptyMap(); + final StringEntity entity = new StringEntity(builder.string()); + final Response createResponse = performRequest(restClient, "PUT", "/" + indexKey, params, entity); + final int statusCode = createResponse.getStatusLine().getStatusCode(); + if (statusCode != 200) { + throw new DBException("create [" + indexKey + "] failed with status [" + statusCode + "]"); + } + } catch (final IOException e) { + throw new DBException(e); + } + } + + final Map<String, String> params = Collections.singletonMap("wait_for_status", "green"); + final Response healthResponse = performRequest(restClient, "GET", "/_cluster/health/" + indexKey, params); + final int healthStatusCode = healthResponse.getStatusLine().getStatusCode(); + if (healthStatusCode != 200) { + throw new DBException("cluster health [" + indexKey + "] failed with status [" + healthStatusCode + "]"); + } + } + + private static Response performRequest( + final RestClient restClient, + final String method, + final String endpoint) throws DBException { + final Map<String, String> params = emptyMap(); + return performRequest(restClient, method, endpoint, params); + } + + private static Response performRequest( + final RestClient restClient, + final String method, + final String endpoint, + final Map<String, String> params) throws DBException { + return performRequest(restClient, method, endpoint, params, null); + } + + private static Header[] emptyHeaders = new Header[0]; + + private static Response performRequest( + final RestClient restClient, + final String method, + final String endpoint, + final Map<String, String> params, + final HttpEntity entity) throws DBException { + try { + final Header[] headers; + if (entity != null) { + headers = new Header[]{new BasicHeader("content-type", ContentType.APPLICATION_JSON.toString())}; + } else { + headers = emptyHeaders; + } + return restClient.performRequest( + method, + endpoint, + params, + entity, + headers); + } catch (final IOException e) { + e.printStackTrace(); + throw new DBException(e); + } } @Override @@ -103,25 +194,30 @@ public class ElasticsearchRestClient extends DB { try { restClient.close(); restClient = null; - } catch (IOException e) { + } catch (final IOException e) { throw new DBException(e); } } } @Override - public Status insert(String table, String key, Map<String, ByteIterator> values) { + public Status insert(final String table, final String key, final Map<String, ByteIterator> values) { try { - Map<String, String> data = StringByteIterator.getStringMap(values); + final Map<String, String> data = StringByteIterator.getStringMap(values); + data.put("key", key); - Response response = restClient.performRequest( - HttpPut.METHOD_NAME, + final Response response = restClient.performRequest( + "POST", "/" + indexKey + "/" + table + "/", Collections.<String, String>emptyMap(), new NStringEntity(new ObjectMapper().writeValueAsString(data), ContentType.APPLICATION_JSON)); - return Status.OK; - } catch (Exception e) { + if (response.getStatusLine().getStatusCode() == 201) { + return Status.OK; + } else { + return Status.ERROR; + } + } catch (final Exception e) { e.printStackTrace(); return Status.ERROR; } @@ -130,74 +226,114 @@ public class ElasticsearchRestClient extends DB { @Override public Status delete(String table, String key) { try { - Response response = restClient.performRequest( - HttpDelete.METHOD_NAME, - "/" + indexKey + "/" + table + "/" + key); + final Response searchResponse = search(table, key); + final int statusCode = searchResponse.getStatusLine().getStatusCode(); + if (statusCode == 404) { + return Status.NOT_FOUND; + } else if (statusCode != 200) { + return Status.ERROR; + } + + final Map<String, Object> map = map(searchResponse); + @SuppressWarnings("unchecked") final Map<String, Object> hits = (Map<String, Object>)map.get("hits"); + final int total = (int)hits.get("total"); + if (total == 0) { + return Status.NOT_FOUND; + } + @SuppressWarnings("unchecked") final Map<String, Object> hit = (Map<String, Object>)((List<Object>)hits.get("hits")).get(0); + @SuppressWarnings("unchecked") final Map<String, Object> source = (Map<String, Object>)hit.get("_source"); + final Response deleteResponse = + restClient.performRequest("DELETE", "/" + indexKey + "/" + table + "/" + source.get("_id")); + if (deleteResponse.getStatusLine().getStatusCode() != 200) { + return Status.ERROR; + } return Status.OK; - } catch (Exception e) { + } catch (final Exception e) { e.printStackTrace(); return Status.ERROR; } } @Override - public Status read(String table, String key, Set<String> fields, Map<String, ByteIterator> result) { + public Status read( + final String table, + final String key, + final Set<String> fields, + final Map<String, ByteIterator> result) { try { - Response response = restClient.performRequest(HttpGet.METHOD_NAME, "/"); + final Response searchResponse = search(table, key); + final int statusCode = searchResponse.getStatusLine().getStatusCode(); + if (statusCode == 404) { + return Status.NOT_FOUND; + } else if (statusCode != 200) { + return Status.ERROR; + } + + final Map<String, Object> map = map(searchResponse); + @SuppressWarnings("unchecked") final Map<String, Object> hits = (Map<String, Object>)map.get("hits"); + final int total = (int)hits.get("total"); + if (total == 0) { + return Status.NOT_FOUND; + } + @SuppressWarnings("unchecked") final Map<String, Object> hit = (Map<String, Object>)((List<Object>)hits.get("hits")).get(0); + @SuppressWarnings("unchecked") final Map<String, Object> source = (Map<String, Object>)hit.get("_source"); + if (fields != null) { + for (final String field : fields) { + result.put(field, new StringByteIterator((String) source.get(field))); + } + } else { + for (final Map.Entry<String, Object> e : source.entrySet()) { + if ("key".equals(e.getKey())) { + continue; + } + result.put(e.getKey(), new StringByteIterator((String) e.getValue())); + } + } return Status.OK; - } catch (Exception e) { + } catch (final Exception e) { e.printStackTrace(); return Status.ERROR; } - -// try { -// final GetResponse response = client.prepareGet(indexKey, table, key).execute().actionGet(); -// -// if (response.isExists()) { -// if (fields != null) { -// for (String field : fields) { -// result.put(field, new StringByteIterator( -// (String) response.getSource().get(field))); -// } -// } else { -// for (String field : response.getSource().keySet()) { -// result.put(field, new StringByteIterator( -// (String) response.getSource().get(field))); -// } -// } -// return Status.OK; -// } else { -// return Status.NOT_FOUND; -// } -// } catch (Exception e) { -// e.printStackTrace(); -// return Status.ERROR; -// } } @Override public Status update(String table, String key, Map<String, ByteIterator> values) { -// try { -// final GetResponse response = client.prepareGet(indexKey, table, key).execute().actionGet(); -// -// if (response.isExists()) { -// for (Entry<String, String> entry : StringByteIterator.getStringMap(values).entrySet()) { -// response.getSource().put(entry.getKey(), entry.getValue()); -// } -// -// client.prepareIndex(indexKey, table, key).setSource(response.getSource()).execute().actionGet(); -// -// return Status.OK; -// } else { -// return Status.NOT_FOUND; -// } -// } catch (Exception e) { -// e.printStackTrace(); -// return Status.ERROR; -// } - return Status.NOT_IMPLEMENTED; + try { + final Response searchResponse = search(table, key); + final int statusCode = searchResponse.getStatusLine().getStatusCode(); + if (statusCode == 404) { + return Status.NOT_FOUND; + } else if (statusCode != 200) { + return Status.ERROR; + } + + final Map<String, Object> map = map(searchResponse); + @SuppressWarnings("unchecked") final Map<String, Object> hits = (Map<String, Object>) map.get("hits"); + final int total = (int) hits.get("total"); + if (total == 0) { + return Status.NOT_FOUND; + } + @SuppressWarnings("unchecked") final Map<String, Object> hit = (Map<String, Object>) ((List<Object>) hits.get("hits")).get(0); + @SuppressWarnings("unchecked") final Map<String, Object> source = (Map<String, Object>) hit.get("_source"); + for (final Map.Entry<String, String> entry : StringByteIterator.getStringMap(values).entrySet()) { + source.put(entry.getKey(), entry.getValue()); + } + final Map<String, String> params = emptyMap(); + final Response response = restClient.performRequest( + "PUT", + "/" + indexKey + "/" + table + "/" + source.get("_id"), + params, + new NStringEntity(new ObjectMapper().writeValueAsString(source), ContentType.APPLICATION_JSON)); + if (response.getStatusLine().getStatusCode() != 200) { + return Status.ERROR; + } + return Status.OK; + } catch (final Exception e) { + e.printStackTrace(); + return Status.ERROR; + } } @Override @@ -207,6 +343,77 @@ public class ElasticsearchRestClient extends DB { int recordcount, Set<String> fields, Vector<HashMap<String, ByteIterator>> result) { - return Status.NOT_IMPLEMENTED; + try { + final Response response; + try (XContentBuilder builder = jsonBuilder()) { + builder.startObject(); + builder.startObject("query"); + builder.startObject("range"); + builder.startObject("key"); + builder.field("gte", startkey); + builder.endObject(); + builder.endObject(); + builder.endObject(); + builder.field("size", recordcount); + builder.endObject(); + response = search(table, builder); + @SuppressWarnings("unchecked") final Map<String, Object> map = map(response); + @SuppressWarnings("unchecked") final Map<String, Object> hits = (Map<String, Object>)map.get("hits"); + @SuppressWarnings("unchecked") final List<Map<String, Object>> list = (List<Map<String, Object>>) hits.get("hits"); + + for (final Map<String, Object> hit : list) { + @SuppressWarnings("unchecked") final Map<String, Object> source = (Map<String, Object>)hit.get("_source"); + final HashMap<String, ByteIterator> entry; + if (fields != null) { + entry = new HashMap<>(fields.size()); + for (final String field : fields) { + entry.put(field, new StringByteIterator((String) source.get(field))); + } + } else { + entry = new HashMap<>(hit.size()); + for (final Map.Entry<String, Object> field : source.entrySet()) { + if ("key".equals(field.getKey())) { + continue; + } + entry.put(field.getKey(), new StringByteIterator((String) field.getValue())); + } + } + result.add(entry); + } + } + return Status.OK; + } catch (final Exception e) { + e.printStackTrace(); + return Status.ERROR; + } + } + + private Response search(final String table, final String key) throws IOException { + try (XContentBuilder builder = jsonBuilder()) { + builder.startObject(); + builder.startObject("query"); + builder.startObject("term"); + builder.field("key", key); + builder.endObject(); + builder.endObject(); + builder.endObject(); + return search(table, builder); + } + } + + private Response search(final String table, final XContentBuilder builder) throws IOException { + final Map<String, String> params = emptyMap(); + final StringEntity entity = new StringEntity(builder.string()); + final Header header = new BasicHeader("content-type", ContentType.APPLICATION_JSON.toString()); + return restClient.performRequest("GET", "/" + indexKey + "/" + table + "/_search", params, entity, header); } + + private Map<String, Object> map(final Response response) throws IOException { + try (InputStream is = response.getEntity().getContent()) { + final ObjectMapper mapper = new ObjectMapper(); + @SuppressWarnings("unchecked") final Map<String, Object> map = mapper.readValue(is, Map.class); + return map; + } + } + }