diff --git a/bin/bindings.properties b/bin/bindings.properties index a2aeb9a64bb6ef9997196ee67c74ce6eb1aa6eef..b37973905920a0be90734c5d1bc57c497d8f9b34 100644 --- a/bin/bindings.properties +++ b/bin/bindings.properties @@ -30,6 +30,7 @@ aerospike:com.yahoo.ycsb.db.AerospikeClient asynchbase:com.yahoo.ycsb.db.AsyncHBaseClient arangodb:com.yahoo.ycsb.db.ArangoDBClient arangodb3:com.yahoo.ycsb.db.arangodb.ArangoDB3Client +azuredocumentdb:com.yahoo.ycsb.db.azuredocumentdb.AzureDocumentDBClient azuretablestorage:com.yahoo.ycsb.db.azuretablestorage.AzureClient basic:com.yahoo.ycsb.BasicDB cassandra-cql:com.yahoo.ycsb.db.CassandraCQLClient @@ -37,9 +38,10 @@ cassandra2-cql:com.yahoo.ycsb.db.CassandraCQLClient cloudspanner:com.yahoo.ycsb.db.cloudspanner.CloudSpannerClient couchbase:com.yahoo.ycsb.db.CouchbaseClient couchbase2:com.yahoo.ycsb.db.couchbase2.Couchbase2Client -azuredocumentdb:com.yahoo.ycsb.db.azuredocumentdb.AzureDocumentDBClient dynamodb:com.yahoo.ycsb.db.DynamoDBClient elasticsearch:com.yahoo.ycsb.db.ElasticsearchClient +elasticsearch5:com.yahoo.ycsb.db.elasticsearch5.ElasticsearchClient +elasticsearch5-rest:com.yahoo.ycsb.db.elasticsearch5.ElasticsearchRestClient geode:com.yahoo.ycsb.db.GeodeClient googlebigtable:com.yahoo.ycsb.db.GoogleBigtableClient googledatastore:com.yahoo.ycsb.db.GoogleDatastoreClient diff --git a/distribution/pom.xml b/distribution/pom.xml index d3de7a92ebfca7389a202fbd4e8f3c3e57798b1d..65f0191df4f8421312ccaae00a05ccc51c9bde68 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -99,6 +99,11 @@ LICENSE file. <artifactId>elasticsearch-binding</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>com.yahoo.ycsb</groupId> + <artifactId>elasticsearch5-binding</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>com.yahoo.ycsb</groupId> <artifactId>geode-binding</artifactId> diff --git a/elasticsearch/src/test/java/com/yahoo/ycsb/db/ElasticsearchClientTest.java b/elasticsearch/src/test/java/com/yahoo/ycsb/db/ElasticsearchClientTest.java index d1ad64d187e642cfd2bf2eabc50f853c478b2825..a14f1a23c8d46bf70eb2fa74728fd9bf0fa42330 100644 --- a/elasticsearch/src/test/java/com/yahoo/ycsb/db/ElasticsearchClientTest.java +++ b/elasticsearch/src/test/java/com/yahoo/ycsb/db/ElasticsearchClientTest.java @@ -1,5 +1,5 @@ /** - * Copyright (c) 2012 YCSB contributors. All rights reserved. + * Copyright (c) 2012-2017 YCSB contributors. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You @@ -30,7 +30,6 @@ import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; -import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -44,15 +43,15 @@ import static org.junit.Assert.assertEquals; public class ElasticsearchClientTest { @ClassRule public final static TemporaryFolder temp = new TemporaryFolder(); - protected final static ElasticsearchClient instance = new ElasticsearchClient(); - protected final static HashMap<String, ByteIterator> MOCK_DATA; - protected final static String MOCK_TABLE = "MOCK_TABLE"; - protected final static String MOCK_KEY0 = "0"; - protected final static String MOCK_KEY1 = "1"; - protected final static String MOCK_KEY2 = "2"; + private final static ElasticsearchClient instance = new ElasticsearchClient(); + private final static HashMap<String, ByteIterator> MOCK_DATA; + private final static String MOCK_TABLE = "MOCK_TABLE"; + private final static String MOCK_KEY0 = "0"; + private final static String MOCK_KEY1 = "1"; + private final static String MOCK_KEY2 = "2"; static { - MOCK_DATA = new HashMap<String, ByteIterator>(10); + MOCK_DATA = new HashMap<>(10); for (int i = 1; i <= 10; i++) { MOCK_DATA.put("field" + i, new StringByteIterator("value" + i)); } @@ -88,7 +87,6 @@ public class ElasticsearchClientTest { */ @Test public void testInsert() { - System.out.println("insert"); Status result = instance.insert(MOCK_TABLE, MOCK_KEY0, MOCK_DATA); assertEquals(Status.OK, result); } @@ -98,7 +96,6 @@ public class ElasticsearchClientTest { */ @Test public void testDelete() { - System.out.println("delete"); Status result = instance.delete(MOCK_TABLE, MOCK_KEY1); assertEquals(Status.OK, result); } @@ -108,9 +105,8 @@ public class ElasticsearchClientTest { */ @Test public void testRead() { - System.out.println("read"); Set<String> fields = MOCK_DATA.keySet(); - HashMap<String, ByteIterator> resultParam = new HashMap<String, ByteIterator>(10); + HashMap<String, ByteIterator> resultParam = new HashMap<>(10); Status result = instance.read(MOCK_TABLE, MOCK_KEY1, fields, resultParam); assertEquals(Status.OK, result); } @@ -120,9 +116,8 @@ public class ElasticsearchClientTest { */ @Test public void testUpdate() { - System.out.println("update"); int i; - HashMap<String, ByteIterator> newValues = new HashMap<String, ByteIterator>(10); + HashMap<String, ByteIterator> newValues = new HashMap<>(10); for (i = 1; i <= 10; i++) { newValues.put("field" + i, new StringByteIterator("newvalue" + i)); @@ -132,13 +127,12 @@ public class ElasticsearchClientTest { assertEquals(Status.OK, result); //validate that the values changed - HashMap<String, ByteIterator> resultParam = new HashMap<String, ByteIterator>(10); + HashMap<String, ByteIterator> resultParam = new HashMap<>(10); instance.read(MOCK_TABLE, MOCK_KEY1, MOCK_DATA.keySet(), resultParam); for (i = 1; i <= 10; i++) { assertEquals("newvalue" + i, resultParam.get("field" + i).toString()); } - } /** @@ -146,10 +140,9 @@ public class ElasticsearchClientTest { */ @Test public void testScan() { - System.out.println("scan"); int recordcount = 10; Set<String> fields = MOCK_DATA.keySet(); - Vector<HashMap<String, ByteIterator>> resultParam = new Vector<HashMap<String, ByteIterator>>(10); + Vector<HashMap<String, ByteIterator>> resultParam = new Vector<>(10); Status result = instance.scan(MOCK_TABLE, MOCK_KEY1, recordcount, fields, resultParam); assertEquals(Status.OK, result); } diff --git a/elasticsearch5/README.md b/elasticsearch5/README.md new file mode 100644 index 0000000000000000000000000000000000000000..015d15110b006bfa7573454dafadfd7197bc37d6 --- /dev/null +++ b/elasticsearch5/README.md @@ -0,0 +1,113 @@ +<!-- +Copyright (c) 2017 YCSB contributors. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); you +may not use this file except in compliance with the License. You +may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +implied. See the License for the specific language governing +permissions and limitations under the License. See accompanying +LICENSE file. +--> + +## Quick Start + +This section describes how to run YCSB on Elasticsearch 5.x running locally. + +### 1. Install and Start Elasticsearch + +[Download and install Elasticsearch][1]. When starting Elasticsearch, you should +[configure][2] the cluster name to be `es.ycsb.cluster` (see below). + +### 2. Set Up YCSB + +Clone the YCSB git repository and compile: + + git clone git://github.com/brianfrankcooper/YCSB.git + cd YCSB + mvn clean package + +### 3. Run YCSB + +Now you are ready to run! First, load the data: + + ./bin/ycsb load elasticsearch5 -s -P workloads/workloada + +Then, run the workload: + + ./bin/ycsb run elasticsearch5 -s -P workloads/workloada + +The Elasticsearch 5 binding requires a standalone instance of Elasticsearch. +You must specify a hosts list for the transport client to connect to via +`-p es.hosts.list=<hostname1:port1>,...,<hostnamen:portn>`: + + ./bin/ycsb run elasticsearch5 -s -P workloads/workloada \ + -p es.hosts.list=<hostname1:port1>,...,<hostnamen:portn>` + +Note that `es.hosts.list` defaults to `localhost:9300`. For further +configuration see below: + +### Defaults Configuration +The default setting for the Elasticsearch node that is created is as follows: + + es.setting.cluster.name=es.ycsb.cluster + es.index.key=es.ycsb + es.number_of_shards=1 + es.number_of_replicas=0 + es.new_index=false + es.hosts.list=localhost:9300 + +### Custom Configuration +If you wish to customize the settings used to create the Elasticsearch node +you can created a new property file that contains your desired Elasticsearch +node settings and pass it in via the parameter to 'bin/ycsb' script. Note that +the default properties will be kept if you don't explicitly overwrite them. + +Assuming that we have a properties file named "myproperties.data" that contains +custom Elasticsearch node configuration you can execute the following to +pass it into the Elasticsearch client: + + ./bin/ycsb run elasticsearch5 -P workloads/workloada -P myproperties.data -s + +If you wish to change the default index name you can set the following property: + + es.index.key=my_index_key + +By default this will use localhost:9300 as a seed node to discover the cluster. +You can also specify + + es.hosts.list=(\w+:\d+)+ + +(a comma-separated list of host/port pairs) to change this. + +### Configuring the transport client + +The `elasticsearch5` binding starts a transport client to connect to +Elasticsearch using the transport protocol. You can pass arbitrary settings to +this instance by using properties with the prefix `es.setting.` followed by any +valid Elasticsearch setting. For example, assuming that you started your +Elasticsearch node with the cluster name `my-elasticsearch-cluster`, you would +need to configure the transport client to use the same cluster name via + + ./bin/ycsb run elasticsearch5 -P <workload> \ + -p es.setting.cluster.name=my-elasticsearch-cluster + +### Using the Elasticsearch low-level REST client + +The Elasticsearch 5 bindings also ship with an implementation that uses the +low-level Elasticsearch REST client. The name of this binding is +`elasticsearch-rest`. For example: + + ./bin/ycsb load elasticsearch5-rest -P workloads/workloada + +You can configure the hosts to connect to via the same `es.hosts.list` property +used to configure the transport client in the `elasticsearch5` binding (note +that by default you should use port 9200) + +[1]: https://www.elastic.co/guide/en/elasticsearch/reference/5.5/_installation.html +[2]: https://www.elastic.co/guide/en/elasticsearch/reference/5.5/settings.html diff --git a/elasticsearch5/pom.xml b/elasticsearch5/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..b537566f58699bc68ae060083f40621ad36662fd --- /dev/null +++ b/elasticsearch5/pom.xml @@ -0,0 +1,225 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Copyright (c) 2017 YCSB contributors. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); you +may not use this file except in compliance with the License. You +may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +implied. See the License for the specific language governing +permissions and limitations under the License. See accompanying +LICENSE file. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>com.yahoo.ycsb</groupId> + <artifactId>binding-parent</artifactId> + <version>0.13.0-SNAPSHOT</version> + <relativePath>../binding-parent</relativePath> + </parent> + + <properties> + <!-- Skip tests by default. will be activated by jdk8 profile --> + <skipTests>true</skipTests> + <elasticsearch.groupid>org.elasticsearch.distribution.zip</elasticsearch.groupid> + + <!-- For integration tests using ANT --> + <integ.http.port>9400</integ.http.port> + <integ.transport.port>9500</integ.transport.port> + </properties> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <version>2.10</version> + <executions> + <execution> + <id>integ-setup-dependencies</id> + <phase>pre-integration-test</phase> + <goals> + <goal>copy</goal> + </goals> + <configuration> + <skip>${skipTests}</skip> + <artifactItems> + <artifactItem> + <groupId>${elasticsearch.groupid}</groupId> + <artifactId>elasticsearch</artifactId> + <version>${elasticsearch5-version}</version> + <type>zip</type> + </artifactItem> + </artifactItems> + <useBaseVersion>true</useBaseVersion> + <outputDirectory>${project.build.directory}/integration-tests/binaries</outputDirectory> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.19</version> + <executions> + <execution> + <id>default-test</id> + <phase>none</phase> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>com.carrotsearch.randomizedtesting</groupId> + <artifactId>junit4-maven-plugin</artifactId> + <version>2.3.3</version> + + <configuration> + <assertions enableSystemAssertions="false"> + <enable/> + </assertions> + + <listeners> + <report-text /> + </listeners> + </configuration> + + <executions> + <execution> + <id>unit-tests</id> + <phase>test</phase> + <goals> + <goal>junit4</goal> + </goals> + <inherited>true</inherited> + <configuration> + <skipTests>${skipTests}</skipTests> + <includes> + <include>**/*Test.class</include> + </includes> + <excludes> + <exclude>**/*$*</exclude> + </excludes> + </configuration> + </execution> + <execution> + <id>integration-tests</id> + <phase>integration-test</phase> + <goals> + <goal>junit4</goal> + </goals> + <inherited>true</inherited> + <configuration> + <skipTests>${skipTests}</skipTests> + <includes> + <include>**/*IT.class</include> + </includes> + <excludes> + <exclude>**/*$*</exclude> + </excludes> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + + <artifactId>elasticsearch5-binding</artifactId> + <name>Elasticsearch 5.x Binding</name> + <packaging>jar</packaging> + <dependencies> + <dependency> + <!-- jna is supported in ES and will be used when provided + otherwise a fallback is used --> + <groupId>org.elasticsearch</groupId> + <artifactId>jna</artifactId> + <version>4.4.0</version> + </dependency> + <dependency> + <groupId>com.yahoo.ycsb</groupId> + <artifactId>core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.elasticsearch.client</groupId> + <artifactId>transport</artifactId> + <version>${elasticsearch5-version}</version> + </dependency> + <dependency> + <groupId>org.elasticsearch.client</groupId> + <artifactId>rest</artifactId> + <version>${elasticsearch5-version}</version> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-api</artifactId> + <version>2.8.2</version> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + <version>2.8.2</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.12</version> + <scope>test</scope> + </dependency> + </dependencies> + + <profiles> + <!-- Requires JDK8 to run, so none of our tests + will work unless we're using jdk8. + --> + <profile> + <id>jdk8-tests</id> + <activation> + <jdk>1.8</jdk> + </activation> + <build> + <plugins> + <plugin> + <groupId>com.github.alexcojocaru</groupId> + <artifactId>elasticsearch-maven-plugin</artifactId> + <version>5.9</version> + <configuration> + <version>${elasticsearch5-version}</version> + <clusterName>test</clusterName> + <httpPort>9200</httpPort> + <transportPort>9300</transportPort> + </configuration> + <executions> + <execution> + <id>start-elasticsearch</id> + <phase>pre-integration-test</phase> + <goals> + <goal>runforked</goal> + </goals> + </execution> + <execution> + <id>stop-elasticsearch</id> + <phase>post-integration-test</phase> + <goals> + <goal>stop</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + <properties> + <skipTests>false</skipTests> + </properties> + </profile> + </profiles> +</project> diff --git a/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/Elasticsearch5.java b/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/Elasticsearch5.java new file mode 100644 index 0000000000000000000000000000000000000000..a67e0447be9ddf42b3c78011415cf6317eaf780b --- /dev/null +++ b/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/Elasticsearch5.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2017 YCSB contributors. All rights reserved. + * <p> + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +package com.yahoo.ycsb.db.elasticsearch5; + +import java.util.Properties; + +final class Elasticsearch5 { + + private Elasticsearch5() { + + } + + static final String KEY = "key"; + + static int parseIntegerProperty(final Properties properties, final String key, final int defaultValue) { + final String value = properties.getProperty(key); + return value == null ? defaultValue : Integer.parseInt(value); + } + +} diff --git a/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/ElasticsearchClient.java b/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/ElasticsearchClient.java new file mode 100644 index 0000000000000000000000000000000000000000..aecee472f6e9b2e26d12021158d7c71aa6bf6ffc --- /dev/null +++ b/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/ElasticsearchClient.java @@ -0,0 +1,333 @@ +/* + * Copyright (c) 2017 YCSB contributors. All rights reserved. + * <p> + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +package com.yahoo.ycsb.db.elasticsearch5; + +import com.yahoo.ycsb.ByteIterator; +import com.yahoo.ycsb.DB; +import com.yahoo.ycsb.DBException; +import com.yahoo.ycsb.Status; +import com.yahoo.ycsb.StringByteIterator; +import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; +import org.elasticsearch.action.delete.DeleteResponse; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Requests; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.query.RangeQueryBuilder; +import org.elasticsearch.index.query.TermQueryBuilder; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.transport.client.PreBuiltTransportClient; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; +import java.util.Vector; + +import static com.yahoo.ycsb.db.elasticsearch5.Elasticsearch5.KEY; +import static com.yahoo.ycsb.db.elasticsearch5.Elasticsearch5.parseIntegerProperty; +import static org.elasticsearch.common.settings.Settings.Builder; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; + +/** + * Elasticsearch client for YCSB framework. + */ +public class ElasticsearchClient extends DB { + + private static final String DEFAULT_CLUSTER_NAME = "es.ycsb.cluster"; + private static final String DEFAULT_INDEX_KEY = "es.ycsb"; + private static final String DEFAULT_REMOTE_HOST = "localhost:9300"; + private static final int NUMBER_OF_SHARDS = 1; + private static final int NUMBER_OF_REPLICAS = 0; + private TransportClient client; + private String indexKey; + + /** + * + * Initialize any state for this DB. Called once per DB instance; there is one + * DB instance per client thread. + */ + @Override + public void init() throws DBException { + final Properties props = getProperties(); + + this.indexKey = props.getProperty("es.index.key", DEFAULT_INDEX_KEY); + + final int numberOfShards = parseIntegerProperty(props, "es.number_of_shards", NUMBER_OF_SHARDS); + final int numberOfReplicas = parseIntegerProperty(props, "es.number_of_replicas", NUMBER_OF_REPLICAS); + + final Boolean newIndex = Boolean.parseBoolean(props.getProperty("es.new_index", "false")); + final Builder settings = Settings.builder().put("cluster.name", DEFAULT_CLUSTER_NAME); + + // if properties file contains elasticsearch user defined properties + // add it to the settings file (will overwrite the defaults). + for (final Entry<Object, Object> e : props.entrySet()) { + if (e.getKey() instanceof String) { + final String key = (String) e.getKey(); + if (key.startsWith("es.setting.")) { + settings.put(key.substring("es.setting.".length()), e.getValue()); + } + } + } + + settings.put("client.transport.sniff", true) + .put("client.transport.ignore_cluster_name", false) + .put("client.transport.ping_timeout", "30s") + .put("client.transport.nodes_sampler_interval", "30s"); + // Default it to localhost:9300 + final String[] nodeList = props.getProperty("es.hosts.list", DEFAULT_REMOTE_HOST).split(","); + client = new PreBuiltTransportClient(settings.build()); + for (String h : nodeList) { + String[] nodes = h.split(":"); + + final InetAddress address; + try { + address = InetAddress.getByName(nodes[0]); + } catch (UnknownHostException e) { + throw new IllegalArgumentException("unable to identity host [" + nodes[0]+ "]", e); + } + final int port; + try { + port = Integer.parseInt(nodes[1]); + } catch (final NumberFormatException e) { + throw new IllegalArgumentException("unable to parse port [" + nodes[1] + "]", e); + } + client.addTransportAddress(new InetSocketTransportAddress(address, port)); + } + + final boolean exists = + client.admin().indices() + .exists(Requests.indicesExistsRequest(indexKey)).actionGet() + .isExists(); + if (exists && newIndex) { + client.admin().indices().prepareDelete(indexKey).get(); + } + if (!exists || newIndex) { + client.admin().indices().create( + new CreateIndexRequest(indexKey) + .settings( + Settings.builder() + .put("index.number_of_shards", numberOfShards) + .put("index.number_of_replicas", numberOfReplicas) + )).actionGet(); + } + client.admin().cluster().health(new ClusterHealthRequest().waitForGreenStatus()).actionGet(); + } + + @Override + public void cleanup() throws DBException { + if (client != null) { + client.close(); + client = null; + } + } + + private volatile boolean isRefreshNeeded = false; + + @Override + public Status insert(String table, String key, Map<String, ByteIterator> values) { + try (XContentBuilder doc = jsonBuilder()) { + + doc.startObject(); + for (final Entry<String, String> entry : StringByteIterator.getStringMap(values).entrySet()) { + doc.field(entry.getKey(), entry.getValue()); + } + doc.field(KEY, key); + doc.endObject(); + + final IndexResponse indexResponse = client.prepareIndex(indexKey, table).setSource(doc).get(); + if (indexResponse.getResult() != DocWriteResponse.Result.CREATED) { + return Status.ERROR; + } + + if (!isRefreshNeeded) { + synchronized (this) { + isRefreshNeeded = true; + } + } + + return Status.OK; + } catch (final Exception e) { + e.printStackTrace(); + return Status.ERROR; + } + } + + @Override + public Status delete(final String table, final String key) { + try { + final SearchResponse searchResponse = search(table, key); + if (searchResponse.getHits().totalHits == 0) { + return Status.NOT_FOUND; + } + + final String id = searchResponse.getHits().getAt(0).getId(); + + final DeleteResponse deleteResponse = client.prepareDelete(indexKey, table, id).get(); + if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) { + return Status.NOT_FOUND; + } + + if (!isRefreshNeeded) { + synchronized (this) { + isRefreshNeeded = true; + } + } + + return Status.OK; + } catch (final Exception e) { + e.printStackTrace(); + return Status.ERROR; + } + } + + @Override + public Status read( + final String table, + final String key, + final Set<String> fields, + final Map<String, ByteIterator> result) { + try { + final SearchResponse searchResponse = search(table, key); + if (searchResponse.getHits().totalHits == 0) { + return Status.NOT_FOUND; + } + + final SearchHit hit = searchResponse.getHits().getAt(0); + if (fields != null) { + for (final String field : fields) { + result.put(field, new StringByteIterator((String) hit.getSource().get(field))); + } + } else { + for (final Map.Entry<String, Object> e : hit.getSource().entrySet()) { + if (KEY.equals(e.getKey())) { + continue; + } + result.put(e.getKey(), new StringByteIterator((String) e.getValue())); + } + } + + return Status.OK; + } catch (final Exception e) { + e.printStackTrace(); + return Status.ERROR; + } + } + + @Override + public Status update(final String table, final String key, final Map<String, ByteIterator> values) { + try { + final SearchResponse response = search(table, key); + if (response.getHits().totalHits == 0) { + return Status.NOT_FOUND; + } + + final SearchHit hit = response.getHits().getAt(0); + for (final Entry<String, String> entry : StringByteIterator.getStringMap(values).entrySet()) { + hit.getSource().put(entry.getKey(), entry.getValue()); + } + + final IndexResponse indexResponse = + client.prepareIndex(indexKey, table, hit.getId()).setSource(hit.getSource()).get(); + + if (indexResponse.getResult() != DocWriteResponse.Result.UPDATED) { + return Status.ERROR; + } + + if (!isRefreshNeeded) { + synchronized (this) { + isRefreshNeeded = true; + } + } + + return Status.OK; + } catch (final Exception e) { + e.printStackTrace(); + return Status.ERROR; + } + } + + @Override + public Status scan( + final String table, + final String startkey, + final int recordcount, + final Set<String> fields, + final Vector<HashMap<String, ByteIterator>> result) { + try { + refreshIfNeeded(); + final RangeQueryBuilder query = new RangeQueryBuilder(KEY).gte(startkey); + final SearchResponse response = client.prepareSearch(indexKey).setQuery(query).setSize(recordcount).get(); + + for (final SearchHit hit : response.getHits()) { + final HashMap<String, ByteIterator> entry; + if (fields != null) { + entry = new HashMap<>(fields.size()); + for (final String field : fields) { + entry.put(field, new StringByteIterator((String) hit.getSource().get(field))); + } + } else { + entry = new HashMap<>(hit.getSource().size()); + for (final Map.Entry<String, Object> field : hit.getSource().entrySet()) { + if (KEY.equals(field.getKey())) { + continue; + } + entry.put(field.getKey(), new StringByteIterator((String) field.getValue())); + } + } + result.add(entry); + } + return Status.OK; + } catch (final Exception e) { + e.printStackTrace(); + return Status.ERROR; + } + } + + private void refreshIfNeeded() { + if (isRefreshNeeded) { + final boolean refresh; + synchronized (this) { + if (isRefreshNeeded) { + refresh = true; + isRefreshNeeded = false; + } else { + refresh = false; + } + } + if (refresh) { + client.admin().indices().refresh(new RefreshRequest()).actionGet(); + } + } + } + + private SearchResponse search(final String table, final String key) { + refreshIfNeeded(); + return client.prepareSearch(indexKey).setTypes(table).setQuery(new TermQueryBuilder(KEY, key)).get(); + } + +} diff --git a/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/ElasticsearchRestClient.java b/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/ElasticsearchRestClient.java new file mode 100644 index 0000000000000000000000000000000000000000..884244803867f86abe9564501e745cfcb75978cf --- /dev/null +++ b/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/ElasticsearchRestClient.java @@ -0,0 +1,444 @@ +/* + * Copyright (c) 2017 YCSB contributors. All rights reserved. + * <p> + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +package com.yahoo.ycsb.db.elasticsearch5; + +import com.yahoo.ycsb.ByteIterator; +import com.yahoo.ycsb.DB; +import com.yahoo.ycsb.DBException; +import com.yahoo.ycsb.Status; +import com.yahoo.ycsb.StringByteIterator; +import org.apache.http.Header; +import org.apache.http.HttpEntity; +import org.apache.http.HttpHost; +import org.apache.http.HttpStatus; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.apache.http.message.BasicHeader; +import org.apache.http.nio.entity.NStringEntity; +import org.codehaus.jackson.map.ObjectMapper; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.Vector; + +import static com.yahoo.ycsb.db.elasticsearch5.Elasticsearch5.KEY; +import static com.yahoo.ycsb.db.elasticsearch5.Elasticsearch5.parseIntegerProperty; +import static java.util.Collections.emptyMap; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; + +/** + * Elasticsearch REST client for YCSB framework. + */ +public class ElasticsearchRestClient extends DB { + + private static final String DEFAULT_INDEX_KEY = "es.ycsb"; + private static final String DEFAULT_REMOTE_HOST = "localhost:9200"; + private static final int NUMBER_OF_SHARDS = 1; + private static final int NUMBER_OF_REPLICAS = 0; + private RestClient restClient; + private String indexKey; + + /** + * + * Initialize any state for this DB. Called once per DB instance; there is one + * DB instance per client thread. + */ + @Override + public void init() throws DBException { + final Properties props = getProperties(); + + this.indexKey = props.getProperty("es.index.key", DEFAULT_INDEX_KEY); + + final int numberOfShards = parseIntegerProperty(props, "es.number_of_shards", NUMBER_OF_SHARDS); + final int numberOfReplicas = parseIntegerProperty(props, "es.number_of_replicas", NUMBER_OF_REPLICAS); + + final Boolean newIndex = Boolean.parseBoolean(props.getProperty("es.new_index", "false")); + + final String[] nodeList = props.getProperty("es.hosts.list", DEFAULT_REMOTE_HOST).split(","); + + final List<HttpHost> esHttpHosts = new ArrayList<>(nodeList.length); + for (String h : nodeList) { + String[] nodes = h.split(":"); + esHttpHosts.add(new HttpHost(nodes[0], Integer.valueOf(nodes[1]), "http")); + } + + restClient = RestClient.builder(esHttpHosts.toArray(new HttpHost[esHttpHosts.size()])).build(); + + final Response existsResponse = performRequest(restClient, "HEAD", "/" + indexKey); + final boolean exists = existsResponse.getStatusLine().getStatusCode() == HttpStatus.SC_OK; + + if (exists && newIndex) { + final Response deleteResponse = performRequest(restClient, "DELETE", "/" + indexKey); + final int statusCode = deleteResponse.getStatusLine().getStatusCode(); + if (statusCode != HttpStatus.SC_OK) { + throw new DBException("delete [" + indexKey + "] failed with status [" + statusCode + "]"); + } + } + + if (!exists || newIndex) { + try (XContentBuilder builder = jsonBuilder()) { + builder.startObject(); + builder.startObject("settings"); + builder.field("index.number_of_shards", numberOfShards); + builder.field("index.number_of_replicas", numberOfReplicas); + builder.endObject(); + builder.endObject(); + final Map<String, String> params = emptyMap(); + final StringEntity entity = new StringEntity(builder.string()); + final Response createResponse = performRequest(restClient, "PUT", "/" + indexKey, params, entity); + final int statusCode = createResponse.getStatusLine().getStatusCode(); + if (statusCode != HttpStatus.SC_OK) { + throw new DBException("create [" + indexKey + "] failed with status [" + statusCode + "]"); + } + } catch (final IOException e) { + throw new DBException(e); + } + } + + final Map<String, String> params = Collections.singletonMap("wait_for_status", "green"); + final Response healthResponse = performRequest(restClient, "GET", "/_cluster/health/" + indexKey, params); + final int healthStatusCode = healthResponse.getStatusLine().getStatusCode(); + if (healthStatusCode != HttpStatus.SC_OK) { + throw new DBException("cluster health [" + indexKey + "] failed with status [" + healthStatusCode + "]"); + } + } + + private static Response performRequest( + final RestClient restClient, + final String method, + final String endpoint) throws DBException { + final Map<String, String> params = emptyMap(); + return performRequest(restClient, method, endpoint, params); + } + + private static Response performRequest( + final RestClient restClient, + final String method, + final String endpoint, + final Map<String, String> params) throws DBException { + return performRequest(restClient, method, endpoint, params, null); + } + + private static final Header[] EMPTY_HEADERS = new Header[0]; + + private static Response performRequest( + final RestClient restClient, + final String method, + final String endpoint, + final Map<String, String> params, + final HttpEntity entity) throws DBException { + try { + final Header[] headers; + if (entity != null) { + headers = new Header[]{new BasicHeader("content-type", ContentType.APPLICATION_JSON.toString())}; + } else { + headers = EMPTY_HEADERS; + } + return restClient.performRequest( + method, + endpoint, + params, + entity, + headers); + } catch (final IOException e) { + e.printStackTrace(); + throw new DBException(e); + } + } + + @Override + public void cleanup() throws DBException { + if (restClient != null) { + try { + restClient.close(); + restClient = null; + } catch (final IOException e) { + throw new DBException(e); + } + } + } + + private volatile boolean isRefreshNeeded = false; + + @Override + public Status insert(final String table, final String key, final Map<String, ByteIterator> values) { + try { + final Map<String, String> data = StringByteIterator.getStringMap(values); + data.put(KEY, key); + + final Response response = restClient.performRequest( + "POST", + "/" + indexKey + "/" + table + "/", + Collections.<String, String>emptyMap(), + new NStringEntity(new ObjectMapper().writeValueAsString(data), ContentType.APPLICATION_JSON)); + + if (response.getStatusLine().getStatusCode() != HttpStatus.SC_CREATED) { + return Status.ERROR; + } + + if (!isRefreshNeeded) { + synchronized (this) { + isRefreshNeeded = true; + } + } + + return Status.OK; + } catch (final Exception e) { + e.printStackTrace(); + return Status.ERROR; + } + } + + @Override + public Status delete(final String table, final String key) { + try { + final Response searchResponse = search(table, key); + final int statusCode = searchResponse.getStatusLine().getStatusCode(); + if (statusCode == HttpStatus.SC_NOT_FOUND) { + return Status.NOT_FOUND; + } else if (statusCode != HttpStatus.SC_OK) { + return Status.ERROR; + } + + final Map<String, Object> map = map(searchResponse); + @SuppressWarnings("unchecked") final Map<String, Object> hits = (Map<String, Object>)map.get("hits"); + final int total = (int)hits.get("total"); + if (total == 0) { + return Status.NOT_FOUND; + } + @SuppressWarnings("unchecked") final Map<String, Object> hit = + (Map<String, Object>)((List<Object>)hits.get("hits")).get(0); + final Response deleteResponse = + restClient.performRequest("DELETE", "/" + indexKey + "/" + table + "/" + hit.get("_id")); + if (deleteResponse.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { + return Status.ERROR; + } + + if (!isRefreshNeeded) { + synchronized (this) { + isRefreshNeeded = true; + } + } + + return Status.OK; + } catch (final Exception e) { + e.printStackTrace(); + return Status.ERROR; + } + } + + @Override + public Status read( + final String table, + final String key, + final Set<String> fields, + final Map<String, ByteIterator> result) { + try { + final Response searchResponse = search(table, key); + final int statusCode = searchResponse.getStatusLine().getStatusCode(); + if (statusCode == 404) { + return Status.NOT_FOUND; + } else if (statusCode != HttpStatus.SC_OK) { + return Status.ERROR; + } + + final Map<String, Object> map = map(searchResponse); + @SuppressWarnings("unchecked") final Map<String, Object> hits = (Map<String, Object>)map.get("hits"); + final int total = (int)hits.get("total"); + if (total == 0) { + return Status.NOT_FOUND; + } + @SuppressWarnings("unchecked") final Map<String, Object> hit = + (Map<String, Object>)((List<Object>)hits.get("hits")).get(0); + @SuppressWarnings("unchecked") final Map<String, Object> source = (Map<String, Object>)hit.get("_source"); + if (fields != null) { + for (final String field : fields) { + result.put(field, new StringByteIterator((String) source.get(field))); + } + } else { + for (final Map.Entry<String, Object> e : source.entrySet()) { + if (KEY.equals(e.getKey())) { + continue; + } + result.put(e.getKey(), new StringByteIterator((String) e.getValue())); + } + } + + return Status.OK; + } catch (final Exception e) { + e.printStackTrace(); + return Status.ERROR; + } + } + + @Override + public Status update(final String table, final String key, final Map<String, ByteIterator> values) { + try { + final Response searchResponse = search(table, key); + final int statusCode = searchResponse.getStatusLine().getStatusCode(); + if (statusCode == 404) { + return Status.NOT_FOUND; + } else if (statusCode != HttpStatus.SC_OK) { + return Status.ERROR; + } + + final Map<String, Object> map = map(searchResponse); + @SuppressWarnings("unchecked") final Map<String, Object> hits = (Map<String, Object>) map.get("hits"); + final int total = (int) hits.get("total"); + if (total == 0) { + return Status.NOT_FOUND; + } + @SuppressWarnings("unchecked") final Map<String, Object> hit = + (Map<String, Object>) ((List<Object>) hits.get("hits")).get(0); + @SuppressWarnings("unchecked") final Map<String, Object> source = (Map<String, Object>) hit.get("_source"); + for (final Map.Entry<String, String> entry : StringByteIterator.getStringMap(values).entrySet()) { + source.put(entry.getKey(), entry.getValue()); + } + final Map<String, String> params = emptyMap(); + final Response response = restClient.performRequest( + "PUT", + "/" + indexKey + "/" + table + "/" + hit.get("_id"), + params, + new NStringEntity(new ObjectMapper().writeValueAsString(source), ContentType.APPLICATION_JSON)); + if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { + return Status.ERROR; + } + + if (!isRefreshNeeded) { + synchronized (this) { + isRefreshNeeded = true; + } + } + + return Status.OK; + } catch (final Exception e) { + e.printStackTrace(); + return Status.ERROR; + } + } + + @Override + public Status scan( + final String table, + final String startkey, + final int recordcount, + final Set<String> fields, + final Vector<HashMap<String, ByteIterator>> result) { + try { + final Response response; + try (XContentBuilder builder = jsonBuilder()) { + builder.startObject(); + builder.startObject("query"); + builder.startObject("range"); + builder.startObject(KEY); + builder.field("gte", startkey); + builder.endObject(); + builder.endObject(); + builder.endObject(); + builder.field("size", recordcount); + builder.endObject(); + response = search(table, builder); + @SuppressWarnings("unchecked") final Map<String, Object> map = map(response); + @SuppressWarnings("unchecked") final Map<String, Object> hits = (Map<String, Object>)map.get("hits"); + @SuppressWarnings("unchecked") final List<Map<String, Object>> list = + (List<Map<String, Object>>) hits.get("hits"); + + for (final Map<String, Object> hit : list) { + @SuppressWarnings("unchecked") final Map<String, Object> source = (Map<String, Object>)hit.get("_source"); + final HashMap<String, ByteIterator> entry; + if (fields != null) { + entry = new HashMap<>(fields.size()); + for (final String field : fields) { + entry.put(field, new StringByteIterator((String) source.get(field))); + } + } else { + entry = new HashMap<>(hit.size()); + for (final Map.Entry<String, Object> field : source.entrySet()) { + if (KEY.equals(field.getKey())) { + continue; + } + entry.put(field.getKey(), new StringByteIterator((String) field.getValue())); + } + } + result.add(entry); + } + } + return Status.OK; + } catch (final Exception e) { + e.printStackTrace(); + return Status.ERROR; + } + } + + private void refreshIfNeeded() throws IOException { + if (isRefreshNeeded) { + final boolean refresh; + synchronized (this) { + if (isRefreshNeeded) { + refresh = true; + isRefreshNeeded = false; + } else { + refresh = false; + } + } + if (refresh) { + restClient.performRequest("POST", "/" + indexKey + "/_refresh"); + } + } + } + + private Response search(final String table, final String key) throws IOException { + try (XContentBuilder builder = jsonBuilder()) { + builder.startObject(); + builder.startObject("query"); + builder.startObject("term"); + builder.field(KEY, key); + builder.endObject(); + builder.endObject(); + builder.endObject(); + return search(table, builder); + } + } + + private Response search(final String table, final XContentBuilder builder) throws IOException { + refreshIfNeeded(); + final Map<String, String> params = emptyMap(); + final StringEntity entity = new StringEntity(builder.string()); + final Header header = new BasicHeader("content-type", ContentType.APPLICATION_JSON.toString()); + return restClient.performRequest("GET", "/" + indexKey + "/" + table + "/_search", params, entity, header); + } + + private Map<String, Object> map(final Response response) throws IOException { + try (InputStream is = response.getEntity().getContent()) { + final ObjectMapper mapper = new ObjectMapper(); + @SuppressWarnings("unchecked") final Map<String, Object> map = mapper.readValue(is, Map.class); + return map; + } + } + +} diff --git a/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/package-info.java b/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/package-info.java new file mode 100644 index 0000000000000000000000000000000000000000..6fef091471263b2c2afd4aa12f571d1bb8bc9382 --- /dev/null +++ b/elasticsearch5/src/main/java/com/yahoo/ycsb/db/elasticsearch5/package-info.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2017 YCSB Contributors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +/** + * The YCSB binding for + * <a href="https://www.elastic.co/products/elasticsearch">Elasticsearch</a>. + */ +package com.yahoo.ycsb.db.elasticsearch5; + diff --git a/elasticsearch5/src/main/resources/log4j2.properties b/elasticsearch5/src/main/resources/log4j2.properties new file mode 100644 index 0000000000000000000000000000000000000000..dc3e1ceb5a1e6c8c65d392698126fda0300f8373 --- /dev/null +++ b/elasticsearch5/src/main/resources/log4j2.properties @@ -0,0 +1,8 @@ +appender.console.type = Console +appender.console.name = console +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = [%d{ISO8601}][%-5p][%-25c{1.}] %marker%m%n +appender.console.targetStr = SYSTEM_ERR + +rootLogger.level = info +rootLogger.appenderRef.console.ref = console diff --git a/elasticsearch5/src/test/java/com/yahoo/ycsb/db/elasticsearch5/ElasticsearchClientIT.java b/elasticsearch5/src/test/java/com/yahoo/ycsb/db/elasticsearch5/ElasticsearchClientIT.java new file mode 100644 index 0000000000000000000000000000000000000000..07dab6fa6af9095c361ab1b9a297ec207d9a9ca2 --- /dev/null +++ b/elasticsearch5/src/test/java/com/yahoo/ycsb/db/elasticsearch5/ElasticsearchClientIT.java @@ -0,0 +1,29 @@ +/** + * Copyright (c) 2017 YCSB contributors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +package com.yahoo.ycsb.db.elasticsearch5; + +import com.yahoo.ycsb.DB; + +public class ElasticsearchClientIT extends ElasticsearchIntegTestBase { + + @Override + DB newDB() { + return new ElasticsearchClient(); + } + +} diff --git a/elasticsearch5/src/test/java/com/yahoo/ycsb/db/elasticsearch5/ElasticsearchIntegTestBase.java b/elasticsearch5/src/test/java/com/yahoo/ycsb/db/elasticsearch5/ElasticsearchIntegTestBase.java new file mode 100644 index 0000000000000000000000000000000000000000..dda766339d02cc2f96261f047d219bd6b4ac56e5 --- /dev/null +++ b/elasticsearch5/src/test/java/com/yahoo/ycsb/db/elasticsearch5/ElasticsearchIntegTestBase.java @@ -0,0 +1,119 @@ +package com.yahoo.ycsb.db.elasticsearch5; + +import com.yahoo.ycsb.ByteIterator; +import com.yahoo.ycsb.Client; +import com.yahoo.ycsb.DB; +import com.yahoo.ycsb.DBException; +import com.yahoo.ycsb.Status; +import com.yahoo.ycsb.StringByteIterator; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Properties; +import java.util.Set; +import java.util.Vector; + +import static org.junit.Assert.assertEquals; + +public abstract class ElasticsearchIntegTestBase { + + private DB db; + + abstract DB newDB(); + + private final static HashMap<String, ByteIterator> MOCK_DATA; + private final static String MOCK_TABLE = "MOCK_TABLE"; + + static { + MOCK_DATA = new HashMap<>(10); + for (int i = 1; i <= 10; i++) { + MOCK_DATA.put("field" + i, new StringByteIterator("value" + i)); + } + } + + @Before + public void setUp() throws DBException { + final Properties props = new Properties(); + props.put("es.new_index", "true"); + props.put("es.setting.cluster.name", "test"); + db = newDB(); + db.setProperties(props); + db.init(); + for (int i = 0; i < 16; i++) { + db.insert(MOCK_TABLE, Integer.toString(i), MOCK_DATA); + } + } + + @After + public void tearDown() throws DBException { + db.cleanup(); + } + + @Test + public void testInsert() { + final Status result = db.insert(MOCK_TABLE, "0", MOCK_DATA); + assertEquals(Status.OK, result); + } + + /** + * Test of delete method, of class ElasticsearchClient. + */ + @Test + public void testDelete() { + final Status result = db.delete(MOCK_TABLE, "1"); + assertEquals(Status.OK, result); + } + + /** + * Test of read method, of class ElasticsearchClient. + */ + @Test + public void testRead() { + final Set<String> fields = MOCK_DATA.keySet(); + final HashMap<String, ByteIterator> resultParam = new HashMap<>(10); + final Status result = db.read(MOCK_TABLE, "1", fields, resultParam); + assertEquals(Status.OK, result); + } + + /** + * Test of update method, of class ElasticsearchClient. + */ + @Test + public void testUpdate() { + final HashMap<String, ByteIterator> newValues = new HashMap<>(10); + + for (int i = 1; i <= 10; i++) { + newValues.put("field" + i, new StringByteIterator("newvalue" + i)); + } + + final Status updateResult = db.update(MOCK_TABLE, "1", newValues); + assertEquals(Status.OK, updateResult); + + // validate that the values changed + final HashMap<String, ByteIterator> resultParam = new HashMap<>(10); + final Status readResult = db.read(MOCK_TABLE, "1", MOCK_DATA.keySet(), resultParam); + assertEquals(Status.OK, readResult); + + for (int i = 1; i <= 10; i++) { + assertEquals("newvalue" + i, resultParam.get("field" + i).toString()); + } + + } + + /** + * Test of scan method, of class ElasticsearchClient. + */ + @Test + public void testScan() { + final int recordcount = 10; + final Set<String> fields = MOCK_DATA.keySet(); + final Vector<HashMap<String, ByteIterator>> resultParam = new Vector<>(10); + final Status result = db.scan(MOCK_TABLE, "1", recordcount, fields, resultParam); + assertEquals(Status.OK, result); + + assertEquals(10, resultParam.size()); + } + +} diff --git a/elasticsearch5/src/test/java/com/yahoo/ycsb/db/elasticsearch5/ElasticsearchRestClientIT.java b/elasticsearch5/src/test/java/com/yahoo/ycsb/db/elasticsearch5/ElasticsearchRestClientIT.java new file mode 100644 index 0000000000000000000000000000000000000000..10b164b0421b16a270444a26754d07b29de38ac5 --- /dev/null +++ b/elasticsearch5/src/test/java/com/yahoo/ycsb/db/elasticsearch5/ElasticsearchRestClientIT.java @@ -0,0 +1,29 @@ +/** + * Copyright (c) 2017 YCSB contributors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +package com.yahoo.ycsb.db.elasticsearch5; + +import com.yahoo.ycsb.DB; + +public class ElasticsearchRestClientIT extends ElasticsearchIntegTestBase { + + @Override + DB newDB() { + return new ElasticsearchRestClient(); + } + +} diff --git a/pom.xml b/pom.xml index 4b2e91c5c90fcca67a66330b2b9869e8c97eb9e9..aab6c89cf2012db1127ad2e9429d9b13c464e7a7 100644 --- a/pom.xml +++ b/pom.xml @@ -91,6 +91,7 @@ LICENSE file. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <thrift.version>0.8.0</thrift.version> <hypertable.version>0.9.5.6</hypertable.version> + <elasticsearch5-version>5.5.1</elasticsearch5-version> <couchbase.version>1.4.10</couchbase.version> <couchbase2.version>2.3.1</couchbase2.version> <tarantool.version>1.6.5</tarantool.version> @@ -123,6 +124,7 @@ LICENSE file. <module>azuredocumentdb</module> <module>dynamodb</module> <module>elasticsearch</module> + <module>elasticsearch5</module> <module>geode</module> <module>googlebigtable</module> <module>googledatastore</module>