diff --git a/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/ElasticsearchClient.java b/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/ElasticsearchClient.java index 386ba7c8054ff6637d90e1a5d081186f9bfa0458..2986f88c27af0d96df95e52c960082baf7f124b2 100644 --- a/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/ElasticsearchClient.java +++ b/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/ElasticsearchClient.java @@ -25,15 +25,18 @@ import com.yahoo.ycsb.StringByteIterator; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.delete.DeleteResponse; -import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.node.Node; +import org.elasticsearch.index.query.RangeQueryBuilder; +import org.elasticsearch.index.query.TermQueryBuilder; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHitField; import org.elasticsearch.transport.client.PreBuiltTransportClient; import java.net.InetAddress; @@ -124,7 +127,7 @@ public class ElasticsearchClient extends DB { .exists(Requests.indicesExistsRequest(indexKey)).actionGet() .isExists(); if (exists && newdb) { - client.admin().indices().prepareDelete(indexKey).execute().actionGet(); + client.admin().indices().prepareDelete(indexKey).get(); } if (!exists || newdb) { client.admin().indices().create( @@ -138,8 +141,8 @@ public class ElasticsearchClient extends DB { client.admin().cluster().health(new ClusterHealthRequest().waitForGreenStatus()).actionGet(); } - private int parseIntegerProperty(Properties properties, String key, int defaultValue) { - String value = properties.getProperty(key); + private int parseIntegerProperty(final Properties properties, final String key, final int defaultValue) { + final String value = properties.getProperty(key); return value == null ? defaultValue : Integer.parseInt(value); } @@ -154,82 +157,96 @@ public class ElasticsearchClient extends DB { @Override public Status insert(String table, String key, Map<String, ByteIterator> values) { try { - final XContentBuilder doc = jsonBuilder().startObject(); + final XContentBuilder doc = jsonBuilder(); - for (Entry<String, String> entry : StringByteIterator.getStringMap(values).entrySet()) { + doc.startObject(); + for (final Entry<String, String> entry : StringByteIterator.getStringMap(values).entrySet()) { doc.field(entry.getKey(), entry.getValue()); } + doc.field("key", key); doc.endObject(); - client.prepareIndex(indexKey, table, key).setSource(doc).execute().actionGet(); + client.prepareIndex(indexKey, table).setSource(doc).execute().actionGet(); return Status.OK; - } catch (Exception e) { + } catch (final Exception e) { e.printStackTrace(); return Status.ERROR; } } @Override - public Status delete(String table, String key) { + public Status delete(final String table, final String key) { try { - DeleteResponse response = client.prepareDelete(indexKey, table, key).execute().actionGet(); - if (response.status().equals(RestStatus.NOT_FOUND)) { + final SearchResponse searchResponse = search(key); + if (searchResponse.getHits().totalHits == 0) { + return Status.NOT_FOUND; + } + + final String id = searchResponse.getHits().getAt(0).getId(); + + final DeleteResponse deleteResponse = client.prepareDelete(indexKey, table, id).execute().actionGet(); + if (deleteResponse.status().equals(RestStatus.NOT_FOUND)) { return Status.NOT_FOUND; } else { return Status.OK; } - } catch (Exception e) { + } catch (final Exception e) { e.printStackTrace(); return Status.ERROR; } } @Override - public Status read(String table, String key, Set<String> fields, Map<String, ByteIterator> result) { + public Status read( + final String table, + final String key, + final Set<String> fields, + final Map<String, ByteIterator> result) { try { - final GetResponse response = client.prepareGet(indexKey, table, key).execute().actionGet(); + final SearchResponse searchResponse = search(key); + if (searchResponse.getHits().totalHits == 0) { + return Status.NOT_FOUND; + } - if (response.isExists()) { - if (fields != null) { - for (String field : fields) { - result.put(field, new StringByteIterator( - (String) response.getSource().get(field))); - } - } else { - for (String field : response.getSource().keySet()) { - result.put(field, new StringByteIterator( - (String) response.getSource().get(field))); - } + final SearchHit hit = searchResponse.getHits().getAt(0); + if (fields != null) { + for (String field : fields) { + result.put(field, new StringByteIterator( + (String) hit.getField(field).getValue())); } - return Status.OK; } else { - return Status.NOT_FOUND; + for (final Map.Entry<String, SearchHitField> e : hit.getFields().entrySet()) { + result.put(e.getKey(), new StringByteIterator((String) e.getValue().getValue())); + } } - } catch (Exception e) { + return Status.OK; + + } catch (final Exception e) { e.printStackTrace(); return Status.ERROR; } } @Override - public Status update(String table, String key, Map<String, ByteIterator> values) { + public Status update(final String table, final String key, final Map<String, ByteIterator> values) { try { - final GetResponse response = client.prepareGet(indexKey, table, key).execute().actionGet(); + final SearchResponse response = search(key); + if (response.getHits().totalHits == 0) { + return Status.NOT_FOUND; + } - if (response.isExists()) { - for (Entry<String, String> entry : StringByteIterator.getStringMap(values).entrySet()) { - response.getSource().put(entry.getKey(), entry.getValue()); - } + final SearchHit hit = response.getHits().getAt(0); + for (final Entry<String, String> entry : StringByteIterator.getStringMap(values).entrySet()) { + hit.getSource().put(entry.getKey(), entry.getValue()); + } - client.prepareIndex(indexKey, table, key).setSource(response.getSource()).execute().actionGet(); + client.prepareIndex(indexKey, table, key).setSource(hit.getSource()).get(); - return Status.OK; - } else { - return Status.NOT_FOUND; - } - } catch (Exception e) { + return Status.OK; + + } catch (final Exception e) { e.printStackTrace(); return Status.ERROR; } @@ -237,11 +254,40 @@ public class ElasticsearchClient extends DB { @Override public Status scan( - String table, - String startkey, - int recordcount, - Set<String> fields, - Vector<HashMap<String, ByteIterator>> result) { - return Status.NOT_IMPLEMENTED; + final String table, + final String startkey, + final int recordcount, + final Set<String> fields, + final Vector<HashMap<String, ByteIterator>> result) { + try { + final RangeQueryBuilder query = new RangeQueryBuilder("key").gte(startkey); + final SearchResponse response = client.prepareSearch(indexKey).setQuery(query).setSize(recordcount).get(); + + for (final SearchHit hit : response.getHits()) { + final HashMap<String, ByteIterator> entry; + if (fields != null) { + entry = new HashMap<>(fields.size()); + for (final String field : fields) { + entry.put(field, new StringByteIterator((String) hit.getSource().get(field))); + } + } else { + entry = new HashMap<>(hit.getFields().size()); + for (final Map.Entry<String, SearchHitField> field : hit.getFields().entrySet()) { + entry.put(field.getKey(), new StringByteIterator((String) field.getValue().getValue())); + } + } + result.add(entry); + } + return Status.OK; + } catch (final Exception e) { + e.printStackTrace(); + return Status.ERROR; + } } + + + private SearchResponse search(final String key) { + return client.prepareSearch(indexKey).setQuery(new TermQueryBuilder("key", key)).get(); + } + }