Skip to content
Snippets Groups Projects
Commit bc69e7eb authored by Jason Tedor's avatar Jason Tedor
Browse files

Elasticsearch 5: Complete REST implementation

This commit is an initial cut at a complete implementation using the
low-level Elasticsearch REST client.
parent 0eb22d13
No related branches found
No related tags found
No related merge requests found
......@@ -41,6 +41,7 @@ couchbase2:com.yahoo.ycsb.db.couchbase2.Couchbase2Client
dynamodb:com.yahoo.ycsb.db.DynamoDBClient
elasticsearch:com.yahoo.ycsb.db.ElasticsearchClient
elasticsearch5:com.yahoo.ycsb.db.elasticsearch5.ElasticsearchClient
elasticsearch5-rest:com.yahoo.ycsb.db.elasticsearch5.ElasticsearchRestClient
geode:com.yahoo.ycsb.db.GeodeClient
googlebigtable:com.yahoo.ycsb.db.GoogleBigtableClient
googledatastore:com.yahoo.ycsb.db.GoogleDatastoreClient
......
/**
/*
* Copyright (c) 2017 YCSB contributors. All rights reserved.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you
......@@ -17,22 +17,38 @@
package com.yahoo.ycsb.db.elasticsearch5;
import com.yahoo.ycsb.*;
import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.DB;
import com.yahoo.ycsb.DBException;
import com.yahoo.ycsb.Status;
import com.yahoo.ycsb.StringByteIterator;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.message.BasicHeader;
import org.apache.http.nio.entity.NStringEntity;
import org.codehaus.jackson.map.ObjectMapper;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.*;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.Vector;
import static java.util.Collections.emptyMap;
import static org.elasticsearch.common.settings.Settings.Builder;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
/**
* Elasticsearch REST client for YCSB framework.
......@@ -44,9 +60,14 @@ public class ElasticsearchRestClient extends DB {
private static final String DEFAULT_REMOTE_HOST = "localhost:9200";
private static final int NUMBER_OF_SHARDS = 1;
private static final int NUMBER_OF_REPLICAS = 0;
private String indexKey;
private RestClient restClient;
private String indexKey;
/**
*
* Initialize any state for this DB. Called once per DB instance; there is one
* DB instance per client thread.
*/
@Override
public void init() throws DBException {
final Properties props = getProperties();
......@@ -63,7 +84,14 @@ public class ElasticsearchRestClient extends DB {
// if properties file contains elasticsearch user defined properties
// add it to the settings file (will overwrite the defaults).
settings.put(props);
for (final Map.Entry<Object, Object> e : props.entrySet()) {
if (e.getKey() instanceof String) {
final String key = (String) e.getKey();
if (key.startsWith("es.setting.")) {
settings.put(key.substring("es.setting.".length()), e.getValue());
}
}
}
final String clusterName = settings.get("cluster.name");
System.err.println("Elasticsearch starting node = " + clusterName);
......@@ -78,23 +106,86 @@ public class ElasticsearchRestClient extends DB {
restClient = RestClient.builder(esHttpHosts.toArray(new HttpHost[esHttpHosts.size()])).build();
// final boolean exists =
// client.admin().indices()
// .exists(Requests.indicesExistsRequest(indexKey)).actionGet()
// .isExists();
// if (exists && newdb) {
// client.admin().indices().prepareDelete(indexKey).execute().actionGet();
// }
// if (!exists || newdb) {
// client.admin().indices().create(
// new CreateIndexRequest(indexKey)
// .settings(
// Settings.builder()
// .put("index.number_of_shards", numberOfShards)
// .put("index.number_of_replicas", numberOfReplicas)
// )).actionGet();
// }
// client.admin().cluster().health(new ClusterHealthRequest().waitForGreenStatus()).actionGet();
final Response existsResponse = performRequest(restClient, "HEAD", "/" + indexKey);
final boolean exists = existsResponse.getStatusLine().getStatusCode() == 200;
if (exists && newdb) {
final Response deleteResponse = performRequest(restClient, "DELETE", "/" + indexKey);
final int statusCode = deleteResponse.getStatusLine().getStatusCode();
if (statusCode != 200) {
throw new DBException("delete [" + indexKey + "] failed with status [" + statusCode + "]");
}
}
if (!exists || newdb) {
try (XContentBuilder builder = jsonBuilder()) {
builder.startObject();
builder.startObject("settings");
builder.field("index.number_of_shards", numberOfShards);
builder.field("index.number_of_replicas", numberOfReplicas);
builder.endObject();
builder.endObject();
final Map<String, String> params = emptyMap();
final StringEntity entity = new StringEntity(builder.string());
final Response createResponse = performRequest(restClient, "PUT", "/" + indexKey, params, entity);
final int statusCode = createResponse.getStatusLine().getStatusCode();
if (statusCode != 200) {
throw new DBException("create [" + indexKey + "] failed with status [" + statusCode + "]");
}
} catch (final IOException e) {
throw new DBException(e);
}
}
final Map<String, String> params = Collections.singletonMap("wait_for_status", "green");
final Response healthResponse = performRequest(restClient, "GET", "/_cluster/health/" + indexKey, params);
final int healthStatusCode = healthResponse.getStatusLine().getStatusCode();
if (healthStatusCode != 200) {
throw new DBException("cluster health [" + indexKey + "] failed with status [" + healthStatusCode + "]");
}
}
private static Response performRequest(
final RestClient restClient,
final String method,
final String endpoint) throws DBException {
final Map<String, String> params = emptyMap();
return performRequest(restClient, method, endpoint, params);
}
private static Response performRequest(
final RestClient restClient,
final String method,
final String endpoint,
final Map<String, String> params) throws DBException {
return performRequest(restClient, method, endpoint, params, null);
}
private static Header[] emptyHeaders = new Header[0];
private static Response performRequest(
final RestClient restClient,
final String method,
final String endpoint,
final Map<String, String> params,
final HttpEntity entity) throws DBException {
try {
final Header[] headers;
if (entity != null) {
headers = new Header[]{new BasicHeader("content-type", ContentType.APPLICATION_JSON.toString())};
} else {
headers = emptyHeaders;
}
return restClient.performRequest(
method,
endpoint,
params,
entity,
headers);
} catch (final IOException e) {
e.printStackTrace();
throw new DBException(e);
}
}
@Override
......@@ -103,25 +194,30 @@ public class ElasticsearchRestClient extends DB {
try {
restClient.close();
restClient = null;
} catch (IOException e) {
} catch (final IOException e) {
throw new DBException(e);
}
}
}
@Override
public Status insert(String table, String key, Map<String, ByteIterator> values) {
public Status insert(final String table, final String key, final Map<String, ByteIterator> values) {
try {
Map<String, String> data = StringByteIterator.getStringMap(values);
final Map<String, String> data = StringByteIterator.getStringMap(values);
data.put("key", key);
Response response = restClient.performRequest(
HttpPut.METHOD_NAME,
final Response response = restClient.performRequest(
"POST",
"/" + indexKey + "/" + table + "/",
Collections.<String, String>emptyMap(),
new NStringEntity(new ObjectMapper().writeValueAsString(data), ContentType.APPLICATION_JSON));
return Status.OK;
} catch (Exception e) {
if (response.getStatusLine().getStatusCode() == 201) {
return Status.OK;
} else {
return Status.ERROR;
}
} catch (final Exception e) {
e.printStackTrace();
return Status.ERROR;
}
......@@ -130,74 +226,114 @@ public class ElasticsearchRestClient extends DB {
@Override
public Status delete(String table, String key) {
try {
Response response = restClient.performRequest(
HttpDelete.METHOD_NAME,
"/" + indexKey + "/" + table + "/" + key);
final Response searchResponse = search(table, key);
final int statusCode = searchResponse.getStatusLine().getStatusCode();
if (statusCode == 404) {
return Status.NOT_FOUND;
} else if (statusCode != 200) {
return Status.ERROR;
}
final Map<String, Object> map = map(searchResponse);
@SuppressWarnings("unchecked") final Map<String, Object> hits = (Map<String, Object>)map.get("hits");
final int total = (int)hits.get("total");
if (total == 0) {
return Status.NOT_FOUND;
}
@SuppressWarnings("unchecked") final Map<String, Object> hit = (Map<String, Object>)((List<Object>)hits.get("hits")).get(0);
@SuppressWarnings("unchecked") final Map<String, Object> source = (Map<String, Object>)hit.get("_source");
final Response deleteResponse =
restClient.performRequest("DELETE", "/" + indexKey + "/" + table + "/" + source.get("_id"));
if (deleteResponse.getStatusLine().getStatusCode() != 200) {
return Status.ERROR;
}
return Status.OK;
} catch (Exception e) {
} catch (final Exception e) {
e.printStackTrace();
return Status.ERROR;
}
}
@Override
public Status read(String table, String key, Set<String> fields, Map<String, ByteIterator> result) {
public Status read(
final String table,
final String key,
final Set<String> fields,
final Map<String, ByteIterator> result) {
try {
Response response = restClient.performRequest(HttpGet.METHOD_NAME, "/");
final Response searchResponse = search(table, key);
final int statusCode = searchResponse.getStatusLine().getStatusCode();
if (statusCode == 404) {
return Status.NOT_FOUND;
} else if (statusCode != 200) {
return Status.ERROR;
}
final Map<String, Object> map = map(searchResponse);
@SuppressWarnings("unchecked") final Map<String, Object> hits = (Map<String, Object>)map.get("hits");
final int total = (int)hits.get("total");
if (total == 0) {
return Status.NOT_FOUND;
}
@SuppressWarnings("unchecked") final Map<String, Object> hit = (Map<String, Object>)((List<Object>)hits.get("hits")).get(0);
@SuppressWarnings("unchecked") final Map<String, Object> source = (Map<String, Object>)hit.get("_source");
if (fields != null) {
for (final String field : fields) {
result.put(field, new StringByteIterator((String) source.get(field)));
}
} else {
for (final Map.Entry<String, Object> e : source.entrySet()) {
if ("key".equals(e.getKey())) {
continue;
}
result.put(e.getKey(), new StringByteIterator((String) e.getValue()));
}
}
return Status.OK;
} catch (Exception e) {
} catch (final Exception e) {
e.printStackTrace();
return Status.ERROR;
}
// try {
// final GetResponse response = client.prepareGet(indexKey, table, key).execute().actionGet();
//
// if (response.isExists()) {
// if (fields != null) {
// for (String field : fields) {
// result.put(field, new StringByteIterator(
// (String) response.getSource().get(field)));
// }
// } else {
// for (String field : response.getSource().keySet()) {
// result.put(field, new StringByteIterator(
// (String) response.getSource().get(field)));
// }
// }
// return Status.OK;
// } else {
// return Status.NOT_FOUND;
// }
// } catch (Exception e) {
// e.printStackTrace();
// return Status.ERROR;
// }
}
@Override
public Status update(String table, String key, Map<String, ByteIterator> values) {
// try {
// final GetResponse response = client.prepareGet(indexKey, table, key).execute().actionGet();
//
// if (response.isExists()) {
// for (Entry<String, String> entry : StringByteIterator.getStringMap(values).entrySet()) {
// response.getSource().put(entry.getKey(), entry.getValue());
// }
//
// client.prepareIndex(indexKey, table, key).setSource(response.getSource()).execute().actionGet();
//
// return Status.OK;
// } else {
// return Status.NOT_FOUND;
// }
// } catch (Exception e) {
// e.printStackTrace();
// return Status.ERROR;
// }
return Status.NOT_IMPLEMENTED;
try {
final Response searchResponse = search(table, key);
final int statusCode = searchResponse.getStatusLine().getStatusCode();
if (statusCode == 404) {
return Status.NOT_FOUND;
} else if (statusCode != 200) {
return Status.ERROR;
}
final Map<String, Object> map = map(searchResponse);
@SuppressWarnings("unchecked") final Map<String, Object> hits = (Map<String, Object>) map.get("hits");
final int total = (int) hits.get("total");
if (total == 0) {
return Status.NOT_FOUND;
}
@SuppressWarnings("unchecked") final Map<String, Object> hit = (Map<String, Object>) ((List<Object>) hits.get("hits")).get(0);
@SuppressWarnings("unchecked") final Map<String, Object> source = (Map<String, Object>) hit.get("_source");
for (final Map.Entry<String, String> entry : StringByteIterator.getStringMap(values).entrySet()) {
source.put(entry.getKey(), entry.getValue());
}
final Map<String, String> params = emptyMap();
final Response response = restClient.performRequest(
"PUT",
"/" + indexKey + "/" + table + "/" + source.get("_id"),
params,
new NStringEntity(new ObjectMapper().writeValueAsString(source), ContentType.APPLICATION_JSON));
if (response.getStatusLine().getStatusCode() != 200) {
return Status.ERROR;
}
return Status.OK;
} catch (final Exception e) {
e.printStackTrace();
return Status.ERROR;
}
}
@Override
......@@ -207,6 +343,77 @@ public class ElasticsearchRestClient extends DB {
int recordcount,
Set<String> fields,
Vector<HashMap<String, ByteIterator>> result) {
return Status.NOT_IMPLEMENTED;
try {
final Response response;
try (XContentBuilder builder = jsonBuilder()) {
builder.startObject();
builder.startObject("query");
builder.startObject("range");
builder.startObject("key");
builder.field("gte", startkey);
builder.endObject();
builder.endObject();
builder.endObject();
builder.field("size", recordcount);
builder.endObject();
response = search(table, builder);
@SuppressWarnings("unchecked") final Map<String, Object> map = map(response);
@SuppressWarnings("unchecked") final Map<String, Object> hits = (Map<String, Object>)map.get("hits");
@SuppressWarnings("unchecked") final List<Map<String, Object>> list = (List<Map<String, Object>>) hits.get("hits");
for (final Map<String, Object> hit : list) {
@SuppressWarnings("unchecked") final Map<String, Object> source = (Map<String, Object>)hit.get("_source");
final HashMap<String, ByteIterator> entry;
if (fields != null) {
entry = new HashMap<>(fields.size());
for (final String field : fields) {
entry.put(field, new StringByteIterator((String) source.get(field)));
}
} else {
entry = new HashMap<>(hit.size());
for (final Map.Entry<String, Object> field : source.entrySet()) {
if ("key".equals(field.getKey())) {
continue;
}
entry.put(field.getKey(), new StringByteIterator((String) field.getValue()));
}
}
result.add(entry);
}
}
return Status.OK;
} catch (final Exception e) {
e.printStackTrace();
return Status.ERROR;
}
}
private Response search(final String table, final String key) throws IOException {
try (XContentBuilder builder = jsonBuilder()) {
builder.startObject();
builder.startObject("query");
builder.startObject("term");
builder.field("key", key);
builder.endObject();
builder.endObject();
builder.endObject();
return search(table, builder);
}
}
private Response search(final String table, final XContentBuilder builder) throws IOException {
final Map<String, String> params = emptyMap();
final StringEntity entity = new StringEntity(builder.string());
final Header header = new BasicHeader("content-type", ContentType.APPLICATION_JSON.toString());
return restClient.performRequest("GET", "/" + indexKey + "/" + table + "/_search", params, entity, header);
}
private Map<String, Object> map(final Response response) throws IOException {
try (InputStream is = response.getEntity().getContent()) {
final ObjectMapper mapper = new ObjectMapper();
@SuppressWarnings("unchecked") final Map<String, Object> map = mapper.readValue(is, Map.class);
return map;
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment