Skip to content
Snippets Groups Projects
Commit e47e90e4 authored by Jason Tedor's avatar Jason Tedor
Browse files

Elasticsearch 5: Use auto-IDs and implements scan

This commit refactors the indexing of documents in the Elasticsearch 5
binding to use auto-generated IDs, instead indexing the key field as a
dedicated field rather than using it as the ID. This enables us to
implement scan functionality which we add in this commit as well.
parent d66e856a
No related branches found
No related tags found
No related merge requests found
......@@ -25,15 +25,18 @@ import com.yahoo.ycsb.StringByteIterator;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.node.Node;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHitField;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import java.net.InetAddress;
......@@ -124,7 +127,7 @@ public class ElasticsearchClient extends DB {
.exists(Requests.indicesExistsRequest(indexKey)).actionGet()
.isExists();
if (exists && newdb) {
client.admin().indices().prepareDelete(indexKey).execute().actionGet();
client.admin().indices().prepareDelete(indexKey).get();
}
if (!exists || newdb) {
client.admin().indices().create(
......@@ -138,8 +141,8 @@ public class ElasticsearchClient extends DB {
client.admin().cluster().health(new ClusterHealthRequest().waitForGreenStatus()).actionGet();
}
private int parseIntegerProperty(Properties properties, String key, int defaultValue) {
String value = properties.getProperty(key);
private int parseIntegerProperty(final Properties properties, final String key, final int defaultValue) {
final String value = properties.getProperty(key);
return value == null ? defaultValue : Integer.parseInt(value);
}
......@@ -154,82 +157,96 @@ public class ElasticsearchClient extends DB {
@Override
public Status insert(String table, String key, Map<String, ByteIterator> values) {
try {
final XContentBuilder doc = jsonBuilder().startObject();
final XContentBuilder doc = jsonBuilder();
for (Entry<String, String> entry : StringByteIterator.getStringMap(values).entrySet()) {
doc.startObject();
for (final Entry<String, String> entry : StringByteIterator.getStringMap(values).entrySet()) {
doc.field(entry.getKey(), entry.getValue());
}
doc.field("key", key);
doc.endObject();
client.prepareIndex(indexKey, table, key).setSource(doc).execute().actionGet();
client.prepareIndex(indexKey, table).setSource(doc).execute().actionGet();
return Status.OK;
} catch (Exception e) {
} catch (final Exception e) {
e.printStackTrace();
return Status.ERROR;
}
}
@Override
public Status delete(String table, String key) {
public Status delete(final String table, final String key) {
try {
DeleteResponse response = client.prepareDelete(indexKey, table, key).execute().actionGet();
if (response.status().equals(RestStatus.NOT_FOUND)) {
final SearchResponse searchResponse = search(key);
if (searchResponse.getHits().totalHits == 0) {
return Status.NOT_FOUND;
}
final String id = searchResponse.getHits().getAt(0).getId();
final DeleteResponse deleteResponse = client.prepareDelete(indexKey, table, id).execute().actionGet();
if (deleteResponse.status().equals(RestStatus.NOT_FOUND)) {
return Status.NOT_FOUND;
} else {
return Status.OK;
}
} catch (Exception e) {
} catch (final Exception e) {
e.printStackTrace();
return Status.ERROR;
}
}
@Override
public Status read(String table, String key, Set<String> fields, Map<String, ByteIterator> result) {
public Status read(
final String table,
final String key,
final Set<String> fields,
final Map<String, ByteIterator> result) {
try {
final GetResponse response = client.prepareGet(indexKey, table, key).execute().actionGet();
final SearchResponse searchResponse = search(key);
if (searchResponse.getHits().totalHits == 0) {
return Status.NOT_FOUND;
}
if (response.isExists()) {
if (fields != null) {
for (String field : fields) {
result.put(field, new StringByteIterator(
(String) response.getSource().get(field)));
}
} else {
for (String field : response.getSource().keySet()) {
result.put(field, new StringByteIterator(
(String) response.getSource().get(field)));
}
final SearchHit hit = searchResponse.getHits().getAt(0);
if (fields != null) {
for (String field : fields) {
result.put(field, new StringByteIterator(
(String) hit.getField(field).getValue()));
}
return Status.OK;
} else {
return Status.NOT_FOUND;
for (final Map.Entry<String, SearchHitField> e : hit.getFields().entrySet()) {
result.put(e.getKey(), new StringByteIterator((String) e.getValue().getValue()));
}
}
} catch (Exception e) {
return Status.OK;
} catch (final Exception e) {
e.printStackTrace();
return Status.ERROR;
}
}
@Override
public Status update(String table, String key, Map<String, ByteIterator> values) {
public Status update(final String table, final String key, final Map<String, ByteIterator> values) {
try {
final GetResponse response = client.prepareGet(indexKey, table, key).execute().actionGet();
final SearchResponse response = search(key);
if (response.getHits().totalHits == 0) {
return Status.NOT_FOUND;
}
if (response.isExists()) {
for (Entry<String, String> entry : StringByteIterator.getStringMap(values).entrySet()) {
response.getSource().put(entry.getKey(), entry.getValue());
}
final SearchHit hit = response.getHits().getAt(0);
for (final Entry<String, String> entry : StringByteIterator.getStringMap(values).entrySet()) {
hit.getSource().put(entry.getKey(), entry.getValue());
}
client.prepareIndex(indexKey, table, key).setSource(response.getSource()).execute().actionGet();
client.prepareIndex(indexKey, table, key).setSource(hit.getSource()).get();
return Status.OK;
} else {
return Status.NOT_FOUND;
}
} catch (Exception e) {
return Status.OK;
} catch (final Exception e) {
e.printStackTrace();
return Status.ERROR;
}
......@@ -237,11 +254,40 @@ public class ElasticsearchClient extends DB {
@Override
public Status scan(
String table,
String startkey,
int recordcount,
Set<String> fields,
Vector<HashMap<String, ByteIterator>> result) {
return Status.NOT_IMPLEMENTED;
final String table,
final String startkey,
final int recordcount,
final Set<String> fields,
final Vector<HashMap<String, ByteIterator>> result) {
try {
final RangeQueryBuilder query = new RangeQueryBuilder("key").gte(startkey);
final SearchResponse response = client.prepareSearch(indexKey).setQuery(query).setSize(recordcount).get();
for (final SearchHit hit : response.getHits()) {
final HashMap<String, ByteIterator> entry;
if (fields != null) {
entry = new HashMap<>(fields.size());
for (final String field : fields) {
entry.put(field, new StringByteIterator((String) hit.getSource().get(field)));
}
} else {
entry = new HashMap<>(hit.getFields().size());
for (final Map.Entry<String, SearchHitField> field : hit.getFields().entrySet()) {
entry.put(field.getKey(), new StringByteIterator((String) field.getValue().getValue()));
}
}
result.add(entry);
}
return Status.OK;
} catch (final Exception e) {
e.printStackTrace();
return Status.ERROR;
}
}
private SearchResponse search(final String key) {
return client.prepareSearch(indexKey).setQuery(new TermQueryBuilder("key", key)).get();
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment