From f513f5b46695d8881827d41bcbae051bfe53f1d2 Mon Sep 17 00:00:00 2001 From: Ilya Suntsov <isuntsov@gridgain.com> Date: Tue, 3 Oct 2017 19:22:59 +0300 Subject: [PATCH] [ignite, ignite-sql] Add Apache Ignite clients 'ignite' and 'ignite-sql' (#1118) Closes #1118 Closes #1165 Co-authored-by: Sergey Puchnin <spuchnin@gridgain.com> Co-authored-by: Taras Ledkov <tledkov@gridgain.com> Co-authored-by: Oleg Ostanin <oostanin@gridagin.com> --- .travis.yml | 2 + bin/bindings.properties | 2 + bin/ycsb | 2 + distribution/pom.xml | 5 + ignite/README.md | 70 ++++ ignite/pom.xml | 104 ++++++ ignite/resources/config/README.md | 18 + ignite/resources/config/ignite-sql.xml | 121 +++++++ ignite/resources/config/ignite.xml | 94 ++++++ .../ycsb/db/ignite/IgniteAbstractClient.java | 151 +++++++++ .../yahoo/ycsb/db/ignite/IgniteClient.java | 252 ++++++++++++++ .../yahoo/ycsb/db/ignite/IgniteSqlClient.java | 299 +++++++++++++++++ .../yahoo/ycsb/db/ignite/package-info.java | 23 ++ ignite/src/main/resources/log4j2.xml | 25 ++ .../db/ignite/IgniteClientCommonTest.java | 40 +++ .../ycsb/db/ignite/IgniteClientTest.java | 207 ++++++++++++ .../ycsb/db/ignite/IgniteSqlClientTest.java | 313 ++++++++++++++++++ pom.xml | 2 + 18 files changed, 1730 insertions(+) create mode 100644 ignite/README.md create mode 100644 ignite/pom.xml create mode 100644 ignite/resources/config/README.md create mode 100644 ignite/resources/config/ignite-sql.xml create mode 100644 ignite/resources/config/ignite.xml create mode 100644 ignite/src/main/java/com/yahoo/ycsb/db/ignite/IgniteAbstractClient.java create mode 100644 ignite/src/main/java/com/yahoo/ycsb/db/ignite/IgniteClient.java create mode 100644 ignite/src/main/java/com/yahoo/ycsb/db/ignite/IgniteSqlClient.java create mode 100644 ignite/src/main/java/com/yahoo/ycsb/db/ignite/package-info.java create mode 100644 ignite/src/main/resources/log4j2.xml create mode 100644 ignite/src/test/java/com/yahoo/ycsb/db/ignite/IgniteClientCommonTest.java create mode 100644 ignite/src/test/java/com/yahoo/ycsb/db/ignite/IgniteClientTest.java create mode 100644 ignite/src/test/java/com/yahoo/ycsb/db/ignite/IgniteSqlClientTest.java diff --git a/.travis.yml b/.travis.yml index 84eeb0d0..5f82a45a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -32,8 +32,10 @@ install: mvn install -q -DskipTests=true script: mvn test -q + # Services to start for tests. services: + - ignite - mongodb # temporarily disable riak. failing, docs offline. # - riak diff --git a/bin/bindings.properties b/bin/bindings.properties index a4e84720..40758c4a 100644 --- a/bin/bindings.properties +++ b/bin/bindings.properties @@ -55,6 +55,8 @@ hbase12:com.yahoo.ycsb.db.hbase12.HBaseClient12 hbase14:com.yahoo.ycsb.db.hbase14.HBaseClient14 hbase20:com.yahoo.ycsb.db.hbase14.HBaseClient20 hypertable:com.yahoo.ycsb.db.HypertableClient +ignite:com.yahoo.ycsb.db.ignite.IgniteClient +ignite-sql:com.yahoo.ycsb.db.ignite.IgniteSqlClient infinispan-cs:com.yahoo.ycsb.db.InfinispanRemoteClient infinispan:com.yahoo.ycsb.db.InfinispanClient jdbc:com.yahoo.ycsb.db.JdbcDBClient diff --git a/bin/ycsb b/bin/ycsb index 9f075f34..61573294 100755 --- a/bin/ycsb +++ b/bin/ycsb @@ -80,6 +80,8 @@ DATABASES = { "hbase14" : "com.yahoo.ycsb.db.hbase14.HBaseClient14", "hbase20" : "com.yahoo.ycsb.db.hbase20.HBaseClient20", "hypertable" : "com.yahoo.ycsb.db.HypertableClient", + "ignite" : "com.yahoo.ycsb.db.ignite.IgniteClient", + "ignite-sql" : "com.yahoo.ycsb.db.ignite.IgniteSqlClient", "infinispan-cs": "com.yahoo.ycsb.db.InfinispanRemoteClient", "infinispan" : "com.yahoo.ycsb.db.InfinispanClient", "jdbc" : "com.yahoo.ycsb.db.JdbcDBClient", diff --git a/distribution/pom.xml b/distribution/pom.xml index 58cbf86c..40a0ffa7 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -164,6 +164,11 @@ LICENSE file. <artifactId>hypertable-binding</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>com.yahoo.ycsb</groupId> + <artifactId>ignite-binding</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>com.yahoo.ycsb</groupId> <artifactId>infinispan-binding</artifactId> diff --git a/ignite/README.md b/ignite/README.md new file mode 100644 index 00000000..4fb9e62f --- /dev/null +++ b/ignite/README.md @@ -0,0 +1,70 @@ +<!-- +Copyright (c) 2018 YCSB contributors. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); you +may not use this file except in compliance with the License. You +may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +implied. See the License for the specific language governing +permissions and limitations under the License. See accompanying +LICENSE file. +--> + +## Quick Start + +This section describes how to run YCSB on [Apache Ignite](https://ignite.apache.org). + +### 1. Set Up YCSB + +Git clone YCSB and compile: + + git clone http://github.com/brianfrankcooper/YCSB.git + cd YCSB + mvn -pl com.yahoo.ycsb:ignite-binding -am clean package + +### 2. Start Apache Ignite +1.1 Download latest binary [Apache Ignite release](https://ignite.apache.org/download.cgi#binaries) + +1.2 Start ignite nodes using apache-ignite-fabric-2.5.0-bin/bin/**ignite.sh** ignite.xml + +1.3 Copy YCSB/ignite/target/ignite-binding-0.15.0-SNAPSHOT.jar to apache-ignite-fabric-2.5.0-bin/libs + +Note: Please use YCSB/ignite/resources/**ignite.xml** for running **IgniteClient** tests and **ignite-sql.xml** for +**IgniteSqlClient** tests. Pay attention that some parameters such us **storagePath**, ****_walPath_****, ****_walArchivePath_**** +should be overwritten by certain pathes. Also please add ip addresses of your host inside the bean **TcpDiscoveryVmIpFinder** + +More information about Apache Ignite WAL (Write Ahead Log): https://apacheignite.readme.io/docs/write-ahead-log +### 3. Load Data and Run Tests + +Load the data: + + .bin/ycsb load ignite -p hosts="10.0.0.1" + -s -P workloads/workloada \ + -threads 4 \ + -p operationcount=100000 \ + -p recordcount=100000 \ + > outputload.txt +Note: '10.0.0.1' is ip address of one of hosts where was started Apache Ignite nodes. + +Run the workload test with IgniteClient: + + .bin/ycsb run ignite -p hosts="10.0.0.1" + -s -P workloads/workloada \ + -threads 4 \ + -p operationcount=100000 \ + -p recordcount=100000 \ + > outputload.txt + +Run the workload test with IgniteSqlClient: + + .bin/ycsb run ignite-sql -p hosts="10.0.0.1" + -s -P workloads/workloada \ + -threads 4 \ + -p operationcount=100000 \ + -p recordcount=100000 \ + > outputload.txt diff --git a/ignite/pom.xml b/ignite/pom.xml new file mode 100644 index 00000000..959a9c54 --- /dev/null +++ b/ignite/pom.xml @@ -0,0 +1,104 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- +Copyright (c) 2012-2018 YCSB contributors. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); you +may not use this file except in compliance with the License. You +may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +implied. See the License for the specific language governing +permissions and limitations under the License. See accompanying +LICENSE file. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>com.yahoo.ycsb</groupId> + <artifactId>binding-parent</artifactId> + <version>0.15.0-SNAPSHOT</version> + <relativePath>../binding-parent</relativePath> + </parent> + + <repositories> + <repository> + <id>GridGain External Repository</id> + <url>http://www.gridgainsystems.com/nexus/content/repositories/external</url> + </repository> + </repositories> + + <artifactId>ignite-binding</artifactId> + <name>Apache Ignite</name> + <packaging>jar</packaging> + + <properties> + <skipJDK9Tests>true</skipJDK9Tests> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-core</artifactId> + <version>${ignite.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-indexing</artifactId> + <version>${ignite.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-log4j2</artifactId> + <version>${ignite.version}</version> + </dependency> + + <dependency> + <groupId>com.yahoo.ycsb</groupId> + <artifactId>core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + <version>1.7.21</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.12</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-library</artifactId> + <version>1.3</version> + </dependency> + + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-api</artifactId> + <version>2.11.0</version> + </dependency> + + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + <version>2.11.0</version> + </dependency> + </dependencies> +</project> diff --git a/ignite/resources/config/README.md b/ignite/resources/config/README.md new file mode 100644 index 00000000..d9cd965d --- /dev/null +++ b/ignite/resources/config/README.md @@ -0,0 +1,18 @@ +<!-- +Copyright (c) 2018 YCSB contributors. +All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); you +may not use this file except in compliance with the License. You +may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +implied. See the License for the specific language governing +permissions and limitations under the License. See accompanying +LICENSE file. +--> +Contains the template of the Ignite configuration used by IgniteClient and IgniteSqlClient. diff --git a/ignite/resources/config/ignite-sql.xml b/ignite/resources/config/ignite-sql.xml new file mode 100644 index 00000000..0986d5cb --- /dev/null +++ b/ignite/resources/config/ignite-sql.xml @@ -0,0 +1,121 @@ +<?xml version="1.0" encoding="UTF-8"?> + + <!-- + Copyright (c) 2018 YCSB contributors. All rights reserved. + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + + <!-- + Ignite Spring configuration file to startup Ignite cache. + This file demonstrates how to configure cache using Spring. Provided cache + will be created on node startup. + Use this configuration file when running HTTP REST examples (see 'examples/rest' folder). + When starting a standalone node, you need to execute the following command: + {IGNITE_HOME}/bin/ignite.{bat|sh} examples/config/example-cache.xml + When starting Ignite from Java IDE, pass path to this file to Ignition: + Ignition.start("examples/config/example-cache.xml"); + --> + +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation=" + http://www.springframework.org/schema/beans + http://www.springframework.org/schema/beans/spring-beans.xsd"> +<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"> + + <property name="dataStorageConfiguration"> + <bean class="org.apache.ignite.configuration.DataStorageConfiguration"> + <property name="walMode" value="LOG_ONLY"/> + <property name="storagePath" value="/path/to/ignite/db/dir"/> + <property name="walPath" value="/path/to/ignite/wal/dir"/> + <property name="walArchivePath" value="/path/to/ignite/walArch/dir"/> + <property name="walHistorySize" value="1"/> + <property name="metricsEnabled" value="true"/> + + <property name="defaultDataRegionConfiguration"> + <bean class="org.apache.ignite.configuration.DataRegionConfiguration"> + <property name="name" value="default_data_region"/> + <property name="persistenceEnabled" value="true"/> + <!-- Setting the max size of the default region to 10GB. --> + <property name="maxSize" value="#{10L * 1024 * 1024 * 1024}"/> + <!-- Setting the initial size of the default region to 10GB. --> + <property name="initialSize" value="#{10L * 1024 * 1024 * 1024}"/> + <property name="checkpointPageBufferSize" value="#{1L * 1024 * 1024 * 1024}"/> + <property name="metricsEnabled" value="true"/> + </bean> + </property> + </bean> + </property> + + + <property name="cacheConfiguration"> + <list> + <bean class="org.apache.ignite.configuration.CacheConfiguration"> + <property name="name" value="usertable"/> + <property name="atomicityMode" value="ATOMIC"/> + <property name="cacheMode" value="PARTITIONED"/> + <property name="backups" value="1"/> + <property name="writeSynchronizationMode" value="FULL_SYNC"/> + + <property name="queryEntities"> + <list> + <bean class="org.apache.ignite.cache.QueryEntity"> + <property name="keyType" value="java.lang.String"/> + <property name="valueType" value="UsertableType"/> + <property name="tableName" value="usertable"/> + <property name="keyFieldName" value="ycsb_key"/> + <property name="fields"> + <map> + <entry key="ycsb_key" value="java.lang.String"/> + <entry key="field0" value="java.lang.String"/> + <entry key="field1" value="java.lang.String"/> + <entry key="field2" value="java.lang.String"/> + <entry key="field3" value="java.lang.String"/> + <entry key="field4" value="java.lang.String"/> + <entry key="field5" value="java.lang.String"/> + <entry key="field6" value="java.lang.String"/> + <entry key="field7" value="java.lang.String"/> + <entry key="field8" value="java.lang.String"/> + <entry key="field9" value="java.lang.String"/> + </map> + </property> + </bean> + </list> + </property> + + </bean> + </list> + </property> + + + <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. --> + <property name="discoverySpi"> + <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> + <property name="ipFinder"> + + <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder"> + <property name="addresses"> + <list> + <!--The list of hosts includes client host. --> + <!--<value><hostname_or_IP>:47500..47509</value>--> + <!--<value><hostname_or_IP>:47500..47509</value>--> + </list> + </property> + </bean> + </property> + </bean> + </property> +</bean> +</beans> diff --git a/ignite/resources/config/ignite.xml b/ignite/resources/config/ignite.xml new file mode 100644 index 00000000..1586e263 --- /dev/null +++ b/ignite/resources/config/ignite.xml @@ -0,0 +1,94 @@ +<?xml version="1.0" encoding="UTF-8"?> + + <!-- + Copyright (c) 2018 YCSB contributors. All rights reserved. + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + + <!-- + Ignite Spring configuration file to startup Ignite cache. + This file demonstrates how to configure cache using Spring. Provided cache + will be created on node startup. + Use this configuration file when running HTTP REST examples (see 'examples/rest' folder). + When starting a standalone node, you need to execute the following command: + {IGNITE_HOME}/bin/ignite.{bat|sh} examples/config/example-cache.xml + When starting Ignite from Java IDE, pass path to this file to Ignition: + Ignition.start("examples/config/example-cache.xml"); + --> + +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation=" + http://www.springframework.org/schema/beans + http://www.springframework.org/schema/beans/spring-beans.xsd"> +<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"> + + <property name="dataStorageConfiguration"> + <bean class="org.apache.ignite.configuration.DataStorageConfiguration"> + <property name="walMode" value="LOG_ONLY"/> + <property name="storagePath" value="/path/to/ignite/db/dir"/> + <property name="walPath" value="/path/to/ignite/wal/dir"/> + <property name="walArchivePath" value="/path/to/ignite/walArch/dir"/> + <property name="walHistorySize" value="1"/> + <property name="metricsEnabled" value="true"/> + + <property name="defaultDataRegionConfiguration"> + <bean class="org.apache.ignite.configuration.DataRegionConfiguration"> + <property name="name" value="default_data_region"/> + <property name="persistenceEnabled" value="true"/> + <!-- Setting the max size of the default region to 10GB. --> + <property name="maxSize" value="#{10L * 1024 * 1024 * 1024}"/> + <!-- Setting the initial size of the default region to 10GB. --> + <property name="initialSize" value="#{10L * 1024 * 1024 * 1024}"/> + <property name="checkpointPageBufferSize" value="#{1L * 1024 * 1024 * 1024}"/> + <property name="metricsEnabled" value="true"/> + </bean> + </property> + </bean> + </property> + + + <property name="cacheConfiguration"> + <list> + <bean class="org.apache.ignite.configuration.CacheConfiguration"> + <property name="name" value="usertable"/> + <property name="atomicityMode" value="ATOMIC"/> + <property name="cacheMode" value="PARTITIONED"/> + <property name="backups" value="1"/> + <property name="writeSynchronizationMode" value="FULL_SYNC"/> + </bean> + </list> + </property> + + + <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. --> + <property name="discoverySpi"> + <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> + <property name="ipFinder"> + + <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder"> + <property name="addresses"> + <list> + <!--The list of hosts includes client host. --> + <!--<value><hostname_or_IP>:47500..47509</value>--> + <!--<value><hostname_or_IP>:47500..47509</value>--> + </list> + </property> + </bean> + </property> + </bean> + </property> +</bean> +</beans> diff --git a/ignite/src/main/java/com/yahoo/ycsb/db/ignite/IgniteAbstractClient.java b/ignite/src/main/java/com/yahoo/ycsb/db/ignite/IgniteAbstractClient.java new file mode 100644 index 00000000..fe9518ca --- /dev/null +++ b/ignite/src/main/java/com/yahoo/ycsb/db/ignite/IgniteAbstractClient.java @@ -0,0 +1,151 @@ +package com.yahoo.ycsb.db.ignite; + +import com.yahoo.ycsb.DB; +import com.yahoo.ycsb.DBException; +import java.util.Collection; +import java.util.LinkedHashSet; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.logger.log4j2.Log4J2Logger; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.Ignition; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * Ignite abstract client. + * <p> + * See {@code ignite/README.md} for details. + * + * @author Sergey Puchnin + * @author Taras Ledkov + * @author Oleg Ostanin + */ +public abstract class IgniteAbstractClient extends DB { + /** */ + protected static Logger log = LogManager.getLogger(IgniteAbstractClient.class); + + protected static final String DEFAULT_CACHE_NAME = "usertable"; + protected static final String HOSTS_PROPERTY = "hosts"; + protected static final String PORTS_PROPERTY = "ports"; + protected static final String CLIENT_NODE_NAME = "YCSB client node"; + protected static final String PORTS_DEFAULTS = "47500..47509"; + + /** + * Count the number of times initialized to teardown on the last + * {@link #cleanup()}. + */ + protected static final AtomicInteger INIT_COUNT = new AtomicInteger(0); + /** Ignite cluster. */ + protected static Ignite cluster = null; + /** Ignite cache to store key-values. */ + protected static IgniteCache<String, BinaryObject> cache = null; + /** Debug flag. */ + protected static boolean debug = false; + + protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + + /** + * Initialize any state for this DB. Called once per DB instance; there is one + * DB instance per client thread. + */ + @Override + public void init() throws DBException { + + // Keep track of number of calls to init (for later cleanup) + INIT_COUNT.incrementAndGet(); + + // Synchronized so that we only have a single + // cluster/session instance for all the threads. + synchronized (INIT_COUNT) { + + // Check if the cluster has already been initialized + if (cluster != null) { + return; + } + + try { + debug = Boolean.parseBoolean(getProperties().getProperty("debug", "false")); + + IgniteConfiguration igcfg = new IgniteConfiguration(); + igcfg.setIgniteInstanceName(CLIENT_NODE_NAME); + + String host = getProperties().getProperty(HOSTS_PROPERTY); + if (host == null) { + throw new DBException(String.format( + "Required property \"%s\" missing for Ignite Cluster", + HOSTS_PROPERTY)); + } + + String ports = getProperties().getProperty(PORTS_PROPERTY, PORTS_DEFAULTS); + + if (ports == null) { + throw new DBException(String.format( + "Required property \"%s\" missing for Ignite Cluster", + PORTS_PROPERTY)); + + } + + System.setProperty("IGNITE_QUIET", "false"); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + Collection<String> addrs = new LinkedHashSet<>(); + addrs.add(host + ":" + ports); + + ((TcpDiscoveryVmIpFinder) ipFinder).setAddresses(addrs); + disco.setIpFinder(ipFinder); + + igcfg.setDiscoverySpi(disco); + igcfg.setNetworkTimeout(2000); + igcfg.setClientMode(true); + + Log4J2Logger logger = new Log4J2Logger(this.getClass().getClassLoader().getResource("log4j2.xml")); + igcfg.setGridLogger(logger); + + log.info("Start Ignite client node."); + cluster = Ignition.start(igcfg); + + log.info("Activate Ignite cluster."); + cluster.active(true); + + cache = cluster.cache(DEFAULT_CACHE_NAME).withKeepBinary(); + + if(cache == null) { + throw new DBException(new IgniteCheckedException("Failed to find cache " + DEFAULT_CACHE_NAME)); + } + } catch (Exception e) { + throw new DBException(e); + } + } // synchronized + } + + /** + * Cleanup any state for this DB. Called once per DB instance; there is one DB + * instance per client thread. + */ + @Override + public void cleanup() throws DBException { + synchronized (INIT_COUNT) { + final int curInitCount = INIT_COUNT.decrementAndGet(); + + if (curInitCount <= 0) { + cluster.close(); + cluster = null; + } + + if (curInitCount < 0) { + // This should never happen. + throw new DBException( + String.format("initCount is negative: %d", curInitCount)); + } + } + } +} diff --git a/ignite/src/main/java/com/yahoo/ycsb/db/ignite/IgniteClient.java b/ignite/src/main/java/com/yahoo/ycsb/db/ignite/IgniteClient.java new file mode 100644 index 00000000..e61fa618 --- /dev/null +++ b/ignite/src/main/java/com/yahoo/ycsb/db/ignite/IgniteClient.java @@ -0,0 +1,252 @@ +/** + * Copyright (c) 2013-2018 YCSB contributors. All rights reserved. + * <p> + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. See accompanying LICENSE file. + * <p> + */ +package com.yahoo.ycsb.db.ignite; + +import com.yahoo.ycsb.*; +import org.apache.ignite.binary.BinaryField; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.binary.BinaryObjectBuilder; +import org.apache.ignite.binary.BinaryType; +import org.apache.ignite.cache.CacheEntryProcessor; +import org.apache.ignite.internal.util.typedef.F; + +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + + +/** + * Ignite client. + * <p> + * See {@code ignite/README.md} for details. + * + * @author Sergey Puchnin + * @author Taras Ledkov + * @author Oleg Ostanin + */ +public class IgniteClient extends IgniteAbstractClient { + /** */ + private static Logger log = LogManager.getLogger(IgniteClient.class); + + + + /** Cached binary type. */ + private BinaryType binType = null; + /** Cached binary type's fields. */ + private final ConcurrentHashMap<String, BinaryField> fieldsCache = new ConcurrentHashMap<>(); + + + + + /** + * Read a record from the database. Each field/value pair from the result will + * be stored in a HashMap. + * + * @param table The name of the table + * @param key The record key of the record to read. + * @param fields The list of fields to read, or null for all of them + * @param result A HashMap of field/value pairs for the result + * @return Zero on success, a non-zero error code on error + */ + @Override + public Status read(String table, String key, Set<String> fields, + Map<String, ByteIterator> result) { + try { + BinaryObject po = cache.get(key); + + if (po == null) { + return Status.NOT_FOUND; + } + + if (binType == null) { + binType = po.type(); + } + + for (String s : F.isEmpty(fields) ? binType.fieldNames() : fields) { + BinaryField bfld = fieldsCache.get(s); + + if (bfld == null) { + bfld = binType.field(s); + fieldsCache.put(s, bfld); + } + + String val = bfld.value(po); + if (val != null) { + result.put(s, new StringByteIterator(val)); + } + + if (debug) { + log.info("table:{" + table + "}, key:{" + key + "}" + ", fields:{" + fields + "}"); + log.info("fields in po{" + binType.fieldNames() + "}"); + log.info("result {" + result + "}"); + } + } + + return Status.OK; + + } catch (Exception e) { + log.error(String.format("Error reading key: %s", key), e); + + return Status.ERROR; + } + } + + /** + * Perform a range scan for a set of records in the database. Each field/value + * pair from the result will be stored in a HashMap. + * + * @param table The name of the table + * @param startkey The record key of the first record to read. + * @param recordcount The number of records to read + * @param fields The list of fields to read, or null for all of them + * @param result A Vector of HashMaps, where each HashMap is a set field/value + * pairs for one record + * @return Zero on success, a non-zero error code on error + */ + @Override + public Status scan(String table, String startkey, int recordcount, + Set<String> fields, Vector<HashMap<String, ByteIterator>> result) { + throw new UnsupportedOperationException("Scan method isn't implemented"); + } + + /** + * Update a record in the database. Any field/value pairs in the specified + * values HashMap will be written into the record with the specified record + * key, overwriting any existing values with the same field name. + * + * @param table The name of the table + * @param key The record key of the record to write. + * @param values A HashMap of field/value pairs to update in the record + * @return Zero on success, a non-zero error code on error + */ + @Override + public Status update(String table, String key, + Map<String, ByteIterator> values) { + try { + cache.invoke(key, new Updater(values)); + + return Status.OK; + } catch (Exception e) { + log.error(String.format("Error updating key: %s", key), e); + + return Status.ERROR; + } + } + + /** + * Insert a record in the database. Any field/value pairs in the specified + * values HashMap will be written into the record with the specified record + * key. + * + * @param table The name of the table + * @param key The record key of the record to insert. + * @param values A HashMap of field/value pairs to insert in the record + * @return Zero on success, a non-zero error code on error + */ + @Override + public Status insert(String table, String key, + Map<String, ByteIterator> values) { + try { + BinaryObjectBuilder bob = cluster.binary().builder("CustomType"); + + for (Map.Entry<String, ByteIterator> entry : values.entrySet()) { + bob.setField(entry.getKey(), entry.getValue().toString()); + + if (debug) { + log.info(entry.getKey() + ":" + entry.getValue()); + } + } + + BinaryObject bo = bob.build(); + + if (table.equals(DEFAULT_CACHE_NAME)) { + cache.put(key, bo); + } else { + throw new UnsupportedOperationException("Unexpected table name: " + table); + } + + return Status.OK; + } catch (Exception e) { + log.error(String.format("Error inserting key: %s", key), e); + + return Status.ERROR; + } + } + + /** + * Delete a record from the database. + * + * @param table The name of the table + * @param key The record key of the record to delete. + * @return Zero on success, a non-zero error code on error + */ + @Override + public Status delete(String table, String key) { + try { + cache.remove(key); + return Status.OK; + } catch (Exception e) { + log.error(String.format("Error deleting key: %s ", key), e); + } + + return Status.ERROR; + } + + /** + * Entry processor to update values. + */ + public static class Updater implements CacheEntryProcessor<String, BinaryObject, Object> { + private String[] flds; + private String[] vals; + + /** + * @param values Updated fields. + */ + Updater(Map<String, ByteIterator> values) { + flds = new String[values.size()]; + vals = new String[values.size()]; + + int idx = 0; + for (Map.Entry<String, ByteIterator> e : values.entrySet()) { + flds[idx] = e.getKey(); + vals[idx] = e.getValue().toString(); + ++idx; + } + } + + /** + * {@inheritDoc} + */ + @Override + public Object process(MutableEntry<String, BinaryObject> mutableEntry, Object... objects) + throws EntryProcessorException { + BinaryObjectBuilder bob = mutableEntry.getValue().toBuilder(); + + for (int i = 0; i < flds.length; ++i) { + bob.setField(flds[i], vals[i]); + } + + mutableEntry.setValue(bob.build()); + + return null; + } + } +} diff --git a/ignite/src/main/java/com/yahoo/ycsb/db/ignite/IgniteSqlClient.java b/ignite/src/main/java/com/yahoo/ycsb/db/ignite/IgniteSqlClient.java new file mode 100644 index 00000000..8a3222e5 --- /dev/null +++ b/ignite/src/main/java/com/yahoo/ycsb/db/ignite/IgniteSqlClient.java @@ -0,0 +1,299 @@ +/** + * Copyright (c) 2013-2018 YCSB contributors. All rights reserved. + * <p> + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. See accompanying LICENSE file. + * <p> + */ +package com.yahoo.ycsb.db.ignite; + +import com.yahoo.ycsb.*; +import org.apache.ignite.cache.query.FieldsQueryCursor; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.internal.util.typedef.F; + +import javax.cache.CacheException; +import java.util.*; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * Ignite client. + * <p> + * See {@code ignite/README.md} for details. + * + * @author Sergey Puchnin + * @author Taras Ledkov + * @author Oleg Ostanin + */ +public class IgniteSqlClient extends IgniteAbstractClient { + /** */ + private static Logger log = LogManager.getLogger(IgniteSqlClient.class); + /** */ + private static final String PRIMARY_KEY = "YCSB_KEY"; + /** + * Read a record from the database. Each field/value pair from the result will + * be stored in a HashMap. + * + * @param table The name of the table + * @param key The record key of the record to read. + * @param fields The list of fields to read, or null for all of them + * @param result A HashMap of field/value pairs for the result + * @return Zero on success, a non-zero error code on error + */ + @Override + public Status read(String table, String key, Set<String> fields, + Map<String, ByteIterator> result) { + try { + StringBuilder sb = new StringBuilder("SELECT * FROM ").append(table) + .append(" WHERE ").append(PRIMARY_KEY).append("=?"); + + SqlFieldsQuery qry = new SqlFieldsQuery(sb.toString()); + qry.setArgs(key); + + FieldsQueryCursor<List<?>> cur = cache.query(qry); + Iterator<List<?>> it = cur.iterator(); + + if (!it.hasNext()) { + return Status.NOT_FOUND; + } + + String[] colNames = new String[cur.getColumnsCount()]; + for (int i = 0; i < colNames.length; ++i) { + String colName = cur.getFieldName(i); + if (F.isEmpty(fields)) { + colNames[i] = colName.toLowerCase(); + } else { + for (String f : fields) { + if (f.equalsIgnoreCase(colName)) { + colNames[i] = f; + } + } + } + } + + while (it.hasNext()) { + List<?> row = it.next(); + + for (int i = 0; i < colNames.length; ++i) { + if (colNames[i] != null) { + result.put(colNames[i], new StringByteIterator((String) row.get(i))); + } + } + } + + return Status.OK; + } catch (Exception e) { + log.error(String.format("Error in processing read from table: %s", table), e); + + return Status.ERROR; + } + } + + /** + Unsupported operation. + */ + @Override + public Status scan(String table, String startkey, int recordcount, + Set<String> fields, Vector<HashMap<String, ByteIterator>> result) { + try { + return Status.OK; + + } catch (Exception e) { + log.error(String.format("Error scanning with startkey: %s", startkey), e); + + return Status.ERROR; + } + + } + + /** + * Update a record in the database. Any field/value pairs in the specified + * values HashMap will be written into the record with the specified record + * key, overwriting any existing values with the same field name. + * + * @param table The name of the table + * @param key The record key of the record to write. + * @param values A HashMap of field/value pairs to update in the record + * @return Zero on success, a non-zero error code on error + */ + @Override + public Status update(String table, String key, + Map<String, ByteIterator> values) { + while (true) { + try { + UpdateData updData = new UpdateData(key, values); + StringBuilder sb = new StringBuilder("UPDATE ").append(table).append(" SET "); + + for (int i = 0; i < updData.getFields().length; ++i) { + sb.append(updData.getFields()[i]).append("=?"); + if (i < updData.getFields().length - 1) { + sb.append(", "); + } + } + + sb.append(" WHERE ").append(PRIMARY_KEY).append("=?"); + + SqlFieldsQuery qry = new SqlFieldsQuery(sb.toString()); + qry.setArgs(updData.getArgs()); + + cache.query(qry).getAll(); + + return Status.OK; + } catch (CacheException e) { + if (!e.getMessage().contains("Failed to update some keys because they had been modified concurrently")) { + log.error(String.format("Error in processing update table: %s", table), e); + + return Status.ERROR; + } + } catch (Exception e) { + log.error(String.format("Error in processing update table: %s", table), e); + + return Status.ERROR; + } + } + } + + /** + * Insert a record in the database. Any field/value pairs in the specified + * values HashMap will be written into the record with the specified record + * key. + * + * @param table The name of the table + * @param key The record key of the record to insert. + * @param values A HashMap of field/value pairs to insert in the record + * @return Zero on success, a non-zero error code on error + */ + @Override + public Status insert(String table, String key, Map<String, ByteIterator> values) { + try { + InsertData insertData = new InsertData(key, values); + StringBuilder sb = new StringBuilder("INSERT INTO ").append(table).append(" (") + .append(insertData.getInsertFields()).append(") VALUES (") + .append(insertData.getInsertParams()).append(')'); + + SqlFieldsQuery qry = new SqlFieldsQuery(sb.toString()); + qry.setArgs(insertData.getArgs()); + + cache.query(qry).getAll(); + + return Status.OK; + } catch (Exception e) { + log.error(String.format("Error in processing insert to table: %s", table), e); + + return Status.ERROR; + } + } + + /** + * Delete a record from the database. + * + * @param table The name of the table + * @param key The record key of the record to delete. + * @return Zero on success, a non-zero error code on error + */ + @Override + public Status delete(String table, String key) { + try { + StringBuilder sb = new StringBuilder("DELETE FROM ").append(table) + .append(" WHERE ").append(PRIMARY_KEY).append(" = ?"); + + SqlFieldsQuery qry = new SqlFieldsQuery(sb.toString()); + qry.setArgs(key); + cache.query(qry).getAll(); + return Status.OK; + } catch (Exception e) { + log.error(String.format("Error in processing read from table: %s", table), e); + + return Status.ERROR; + } + } + + /** + * Field and values for insert queries. + */ + private static class InsertData { + private final Object[] args; + private final String insertFields; + private final String insertParams; + + /** + * @param key Key. + * @param values Field values. + */ + InsertData(String key, Map<String, ByteIterator> values) { + args = new String[values.size() + 1]; + + int idx = 0; + args[idx++] = key; + + StringBuilder sbFields = new StringBuilder(PRIMARY_KEY); + StringBuilder sbParams = new StringBuilder("?"); + + for (Map.Entry<String, ByteIterator> e : values.entrySet()) { + args[idx++] = e.getValue().toString(); + sbFields.append(',').append(e.getKey()); + sbParams.append(", ?"); + } + + insertFields = sbFields.toString(); + insertParams = sbParams.toString(); + } + + public Object[] getArgs() { + return args; + } + + public String getInsertFields() { + return insertFields; + } + + public String getInsertParams() { + return insertParams; + } + } + + /** + * Field and values for update queries. + */ + private static class UpdateData { + private final Object[] args; + private final String[] fields; + + /** + * @param key Key. + * @param values Field values. + */ + UpdateData(String key, Map<String, ByteIterator> values) { + args = new String[values.size() + 1]; + fields = new String[values.size()]; + + int idx = 0; + + for (Map.Entry<String, ByteIterator> e : values.entrySet()) { + args[idx] = e.getValue().toString(); + fields[idx++] = e.getKey(); + } + + args[idx] = key; + } + + public Object[] getArgs() { + return args; + } + + public String[] getFields() { + return fields; + } + } +} diff --git a/ignite/src/main/java/com/yahoo/ycsb/db/ignite/package-info.java b/ignite/src/main/java/com/yahoo/ycsb/db/ignite/package-info.java new file mode 100644 index 00000000..25c75c92 --- /dev/null +++ b/ignite/src/main/java/com/yahoo/ycsb/db/ignite/package-info.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2014, Yahoo!, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +/** + * The YCSB binding for <a href="http://ignite.apache.org/">Ignite</a>. + * Naive implementation. + */ +package com.yahoo.ycsb.db.ignite; + diff --git a/ignite/src/main/resources/log4j2.xml b/ignite/src/main/resources/log4j2.xml new file mode 100644 index 00000000..a221422f --- /dev/null +++ b/ignite/src/main/resources/log4j2.xml @@ -0,0 +1,25 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<Configuration status="INFO"> + <Appenders> + <Console name="CONSOLE" target="SYSTEM_OUT"> + <PatternLayout pattern="[%d{HH:mm:ss}][%-5p][%t] %m%n"/> + </Console> + + <RollingFile name="ROLLING_FILE" fileName="ignite-binding.log" + filePattern="ignite-binding-%i-%d{yyyy-MM-dd}.log.gz"> + <PatternLayout pattern="[%d{yyyy-MM-dd HH:mm:ss,SSS}][%-5p][%t] %m%n"/> + <Policies> + <TimeBasedTriggeringPolicy/> + <SizeBasedTriggeringPolicy size="2 MB"/> + </Policies> + <DefaultRolloverStrategy max="200"/> + </RollingFile> + </Appenders> + <Loggers> + <Root level="info"> + <AppenderRef ref="CONSOLE"/> + <AppenderRef ref="ROLLING_FILE"/> + </Root> + </Loggers> +</Configuration> diff --git a/ignite/src/test/java/com/yahoo/ycsb/db/ignite/IgniteClientCommonTest.java b/ignite/src/test/java/com/yahoo/ycsb/db/ignite/IgniteClientCommonTest.java new file mode 100644 index 00000000..a522124b --- /dev/null +++ b/ignite/src/test/java/com/yahoo/ycsb/db/ignite/IgniteClientCommonTest.java @@ -0,0 +1,40 @@ +package com.yahoo.ycsb.db.ignite; + +import com.yahoo.ycsb.DB; +import org.apache.ignite.Ignite; +import org.junit.After; +import org.junit.AfterClass; + +/** + * Common test class. + */ +public class IgniteClientCommonTest { + /** */ + protected static Ignite cluster; + + /** */ + protected DB client; + + /** + * + */ + @After + public void tearDown() throws Exception { + client.cleanup(); + } + + /** + * + */ + @AfterClass + public static void afterClass() { + cluster.close(); + + try { + Thread.sleep(1000); + } + catch (InterruptedException e) { + e.printStackTrace(); + } + } +} diff --git a/ignite/src/test/java/com/yahoo/ycsb/db/ignite/IgniteClientTest.java b/ignite/src/test/java/com/yahoo/ycsb/db/ignite/IgniteClientTest.java new file mode 100644 index 00000000..f14c5d7d --- /dev/null +++ b/ignite/src/test/java/com/yahoo/ycsb/db/ignite/IgniteClientTest.java @@ -0,0 +1,207 @@ +/** + * Copyright (c) 2015 YCSB contributors All rights reserved. + * <p> + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +package com.yahoo.ycsb.db.ignite; + +import com.yahoo.ycsb.ByteIterator; +import com.yahoo.ycsb.Status; +import com.yahoo.ycsb.StringByteIterator; +import com.yahoo.ycsb.measurements.Measurements; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.Ignition; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.logger.log4j2.Log4J2Logger; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.*; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasEntry; +import static org.hamcrest.Matchers.is; + +/** + * Integration tests for the Ignite client + */ +public class IgniteClientTest extends IgniteClientCommonTest { + private static final String DEFAULT_CACHE_NAME = "usertable"; + private final static String HOST = "127.0.0.1"; + private final static String PORTS = "47500..47509"; + private final static String SERVER_NODE_NAME = "YCSB Server Node"; + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + @BeforeClass + public static void beforeTest() throws IgniteCheckedException { + IgniteConfiguration igcfg = new IgniteConfiguration(); + igcfg.setIgniteInstanceName(SERVER_NODE_NAME); + igcfg.setClientMode(false); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + Collection<String> adders = new LinkedHashSet<>(); + adders.add(HOST + ":" + PORTS); + + ((TcpDiscoveryVmIpFinder) ipFinder).setAddresses(adders); + disco.setIpFinder(ipFinder); + + igcfg.setDiscoverySpi(disco); + igcfg.setNetworkTimeout(2000); + + CacheConfiguration ccfg = new CacheConfiguration().setName(DEFAULT_CACHE_NAME); + + igcfg.setCacheConfiguration(ccfg); + + Log4J2Logger logger = new Log4J2Logger(IgniteClientTest.class.getClassLoader().getResource("log4j2.xml")); + igcfg.setGridLogger(logger); + + cluster = Ignition.start(igcfg); + cluster.active(); + } + + @Before + public void setUp() throws Exception { + Properties p = new Properties(); + p.setProperty("hosts", HOST); + p.setProperty("ports", PORTS); + Measurements.setProperties(p); + + client = new IgniteClient(); + client.setProperties(p); + client.init(); + } + + @Test + public void testInsert() throws Exception { + cluster.cache(DEFAULT_CACHE_NAME).clear(); + final String key = "key"; + final Map<String, String> input = new HashMap<>(); + input.put("field0", "value1"); + input.put("field1", "value2"); + final Status status = client.insert(DEFAULT_CACHE_NAME, key, StringByteIterator.getByteIteratorMap(input)); + assertThat(status, is(Status.OK)); + assertThat(cluster.cache(DEFAULT_CACHE_NAME).size(), is(1)); + } + + @Test + public void testDelete() throws Exception { + cluster.cache(DEFAULT_CACHE_NAME).clear(); + final String key1 = "key1"; + final Map<String, String> input1 = new HashMap<>(); + input1.put("field0", "value1"); + input1.put("field1", "value2"); + final Status status1 = client.insert(DEFAULT_CACHE_NAME, key1, StringByteIterator.getByteIteratorMap(input1)); + assertThat(status1, is(Status.OK)); + assertThat(cluster.cache(DEFAULT_CACHE_NAME).size(), is(1)); + + final String key2 = "key2"; + final Map<String, String> input2 = new HashMap<>(); + input2.put("field0", "value1"); + input2.put("field1", "value2"); + final Status status2 = client.insert(DEFAULT_CACHE_NAME, key2, StringByteIterator.getByteIteratorMap(input2)); + assertThat(status2, is(Status.OK)); + assertThat(cluster.cache(DEFAULT_CACHE_NAME).size(), is(2)); + + final Status status3 = client.delete(DEFAULT_CACHE_NAME, key2); + assertThat(status3, is(Status.OK)); + assertThat(cluster.cache(DEFAULT_CACHE_NAME).size(), is(1)); + + } + + @Test + public void testRead() throws Exception { + cluster.cache(DEFAULT_CACHE_NAME).clear(); + final String key = "key"; + final Map<String, String> input = new HashMap<>(); + input.put("field0", "value1"); + input.put("field1", "value2A"); + input.put("field3", null); + final Status sPut = client.insert(DEFAULT_CACHE_NAME, key, StringByteIterator.getByteIteratorMap(input)); + assertThat(sPut, is(Status.OK)); + assertThat(cluster.cache(DEFAULT_CACHE_NAME).size(), is(1)); + + final Set<String> fld = new TreeSet<>(); + fld.add("field0"); + fld.add("field1"); + fld.add("field3"); + + final HashMap<String, ByteIterator> result = new HashMap<>(); + final Status sGet = client.read(DEFAULT_CACHE_NAME, key, fld, result); + assertThat(sGet, is(Status.OK)); + + final HashMap<String, String> strResult = new HashMap<String, String>(); + for (final Map.Entry<String, ByteIterator> e : result.entrySet()) { + if (e.getValue() != null) { + strResult.put(e.getKey(), e.getValue().toString()); + } + } + assertThat(strResult, hasEntry("field0", "value1")); + assertThat(strResult, hasEntry("field1", "value2A")); + } + + @Test + public void testReadAllFields() throws Exception { + cluster.cache(DEFAULT_CACHE_NAME).clear(); + final String key = "key"; + final Map<String, String> input = new HashMap<>(); + input.put("field0", "value1"); + input.put("field1", "value2A"); + input.put("field3", null); + final Status sPut = client.insert(DEFAULT_CACHE_NAME, key, StringByteIterator.getByteIteratorMap(input)); + assertThat(sPut, is(Status.OK)); + assertThat(cluster.cache(DEFAULT_CACHE_NAME).size(), is(1)); + + final Set<String> fld = new TreeSet<>(); + + final HashMap<String, ByteIterator> result1 = new HashMap<>(); + final Status sGet = client.read(DEFAULT_CACHE_NAME, key, fld, result1); + assertThat(sGet, is(Status.OK)); + + final HashMap<String, String> strResult = new HashMap<String, String>(); + for (final Map.Entry<String, ByteIterator> e : result1.entrySet()) { + if (e.getValue() != null) { + strResult.put(e.getKey(), e.getValue().toString()); + } + } + assertThat(strResult, hasEntry("field0", "value1")); + assertThat(strResult, hasEntry("field1", "value2A")); + } + + @Test + public void testReadNotPresent() throws Exception { + cluster.cache(DEFAULT_CACHE_NAME).clear(); + final String key = "key"; + final Map<String, String> input = new HashMap<>(); + input.put("field0", "value1"); + input.put("field1", "value2A"); + input.put("field3", null); + final Status sPut = client.insert(DEFAULT_CACHE_NAME, key, StringByteIterator.getByteIteratorMap(input)); + assertThat(sPut, is(Status.OK)); + assertThat(cluster.cache(DEFAULT_CACHE_NAME).size(), is(1)); + + final Set<String> fld = new TreeSet<>(); + + final String newKey = "newKey"; + final HashMap<String, ByteIterator> result1 = new HashMap<>(); + final Status sGet = client.read(DEFAULT_CACHE_NAME, newKey, fld, result1); + assertThat(sGet, is(Status.NOT_FOUND)); + + } +} diff --git a/ignite/src/test/java/com/yahoo/ycsb/db/ignite/IgniteSqlClientTest.java b/ignite/src/test/java/com/yahoo/ycsb/db/ignite/IgniteSqlClientTest.java new file mode 100644 index 00000000..876ccbc3 --- /dev/null +++ b/ignite/src/test/java/com/yahoo/ycsb/db/ignite/IgniteSqlClientTest.java @@ -0,0 +1,313 @@ +/** + * Copyright (c) 2015 YCSB contributors All rights reserved. + * <p> + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +package com.yahoo.ycsb.db.ignite; + +import com.yahoo.ycsb.ByteIterator; +import com.yahoo.ycsb.Status; +import com.yahoo.ycsb.StringByteIterator; +import com.yahoo.ycsb.measurements.Measurements; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.Ignition; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.logger.log4j2.Log4J2Logger; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasEntry; +import static org.hamcrest.Matchers.is; + +/** + * Integration tests for the Ignite client + */ +public class IgniteSqlClientTest extends IgniteClientCommonTest { + private static final String DEFAULT_CACHE_NAME = "usertable"; + private static final String TABLE_NAME = "usertable"; + private final static String HOST = "127.0.0.1"; + private final static String PORTS = "47500..47509"; + private final static String SERVER_NODE_NAME = "YCSB Server Node"; + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** + * + */ + @BeforeClass + public static void beforeTest() throws IgniteCheckedException { + IgniteConfiguration igcfg = new IgniteConfiguration(); + igcfg.setIgniteInstanceName(SERVER_NODE_NAME); + igcfg.setClientMode(false); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + Collection<String> adders = new LinkedHashSet<>(); + adders.add(HOST + ":" + PORTS); + + QueryEntity qe = new QueryEntity("java.lang.String", "UserTableType") + .addQueryField("ycsb_key", "java.lang.String", null) + .addQueryField("field0", "java.lang.String", null) + .addQueryField("field1", "java.lang.String", null) + .addQueryField("field2", "java.lang.String", null) + .addQueryField("field3", "java.lang.String", null) + .addQueryField("field4", "java.lang.String", null) + .addQueryField("field5", "java.lang.String", null) + .addQueryField("field6", "java.lang.String", null) + .addQueryField("field7", "java.lang.String", null) + .addQueryField("field8", "java.lang.String", null) + .addQueryField("field9", "java.lang.String", null) + .setKeyFieldName("ycsb_key"); + + qe.setTableName("usertable"); + + CacheConfiguration ccfg = new CacheConfiguration().setQueryEntities(Collections.singleton(qe)) + .setName(DEFAULT_CACHE_NAME); + + igcfg.setCacheConfiguration(ccfg); + + ((TcpDiscoveryVmIpFinder) ipFinder).setAddresses(adders); + disco.setIpFinder(ipFinder); + + igcfg.setDiscoverySpi(disco); + igcfg.setNetworkTimeout(2000); + + Log4J2Logger logger = new Log4J2Logger(IgniteSqlClientTest.class.getClassLoader().getResource("log4j2.xml")); + igcfg.setGridLogger(logger); + + cluster = Ignition.start(igcfg); + cluster.active(); + } + + @Before + public void setUp() throws Exception { + Properties p = new Properties(); + p.setProperty("hosts", HOST); + p.setProperty("ports", PORTS); + Measurements.setProperties(p); + + client = new IgniteSqlClient(); + client.setProperties(p); + client.init(); + } + + @Test + public void testInsert() throws Exception { + cluster.cache(DEFAULT_CACHE_NAME).clear(); + final String key = "key"; + final Map<String, String> input = new HashMap<>(); + input.put("field0", "value1"); + input.put("field1", "value2"); + final Status status = client.insert(TABLE_NAME, key, StringByteIterator.getByteIteratorMap(input)); + assertThat(status, is(Status.OK)); + assertThat(cluster.cache(DEFAULT_CACHE_NAME).size(), is(1)); + } + + @Test + public void testDelete() throws Exception { + cluster.cache(DEFAULT_CACHE_NAME).clear(); + final String key1 = "key1"; + final Map<String, String> input1 = new HashMap<>(); + input1.put("field0", "value1"); + input1.put("field1", "value2"); + final Status status1 = client.insert(TABLE_NAME, key1, StringByteIterator.getByteIteratorMap(input1)); + assertThat(status1, is(Status.OK)); + assertThat(cluster.cache(DEFAULT_CACHE_NAME).size(), is(1)); + + final String key2 = "key2"; + final Map<String, String> input2 = new HashMap<>(); + input2.put("field0", "value1"); + input2.put("field1", "value2"); + final Status status2 = client.insert(TABLE_NAME, key2, StringByteIterator.getByteIteratorMap(input2)); + assertThat(status2, is(Status.OK)); + assertThat(cluster.cache(DEFAULT_CACHE_NAME).size(), is(2)); + + final Status status3 = client.delete(TABLE_NAME, key2); + assertThat(status3, is(Status.OK)); + assertThat(cluster.cache(DEFAULT_CACHE_NAME).size(), is(1)); + + } + + @Test + public void testRead() throws Exception { + cluster.cache(DEFAULT_CACHE_NAME).clear(); + final String key = "key"; + final Map<String, String> input = new HashMap<>(); + input.put("field0", "value1"); + input.put("field1", "value2A"); + input.put("field3", null); + final Status sPut = client.insert(TABLE_NAME, key, StringByteIterator.getByteIteratorMap(input)); + assertThat(sPut, is(Status.OK)); + assertThat(cluster.cache(DEFAULT_CACHE_NAME).size(), is(1)); + + final Set<String> fld = new TreeSet<>(); + fld.add("field0"); + fld.add("field1"); + fld.add("field3"); + + final HashMap<String, ByteIterator> result = new HashMap<>(); + final Status sGet = client.read(TABLE_NAME, key, fld, result); + assertThat(sGet, is(Status.OK)); + + final HashMap<String, String> strResult = new HashMap<String, String>(); + for (final Map.Entry<String, ByteIterator> e : result.entrySet()) { + if (e.getValue() != null) { + strResult.put(e.getKey(), e.getValue().toString()); + } + } + assertThat(strResult, hasEntry("field0", "value1")); + assertThat(strResult, hasEntry("field1", "value2A")); + } + + @Test + public void testUpdate() throws Exception { + cluster.cache(DEFAULT_CACHE_NAME).clear(); + final String key = "key"; + final Map<String, String> input = new HashMap<>(); + input.put("field0", "value1"); + input.put("field1", "value2A"); + input.put("field3", null); + client.insert(TABLE_NAME, key, StringByteIterator.getByteIteratorMap(input)); + + input.put("field1", "value2B"); + input.put("field4", "value4A"); + + final Status sUpd = client.update(TABLE_NAME, key, StringByteIterator.getByteIteratorMap(input)); + assertThat(sUpd, is(Status.OK)); + + final Set<String> fld = new TreeSet<>(); + fld.add("field0"); + fld.add("field1"); + fld.add("field3"); + fld.add("field4"); + + final HashMap<String, ByteIterator> result = new HashMap<>(); + final Status sGet = client.read(TABLE_NAME, key, fld, result); + assertThat(sGet, is(Status.OK)); + + final HashMap<String, String> strResult = new HashMap<String, String>(); + for (final Map.Entry<String, ByteIterator> e : result.entrySet()) { + if (e.getValue() != null) { + strResult.put(e.getKey(), e.getValue().toString()); + } + } + assertThat(strResult, hasEntry("field0", "value1")); + assertThat(strResult, hasEntry("field1", "value2B")); + assertThat(strResult, hasEntry("field4", "value4A")); + } + + @Test + public void testConcurrentUpdate() throws Exception { + cluster.cache(DEFAULT_CACHE_NAME).clear(); + final String key = "key"; + final Map<String, String> input = new HashMap<>(); + input.put("field0", "value1"); + input.put("field1", "value2A"); + input.put("field3", null); + client.insert(TABLE_NAME, key, StringByteIterator.getByteIteratorMap(input)); + + input.put("field1", "value2B"); + input.put("field4", "value4A"); + + ExecutorService exec = Executors.newCachedThreadPool(); + + final AtomicLong l = new AtomicLong(0); + final Boolean[] updError = {false}; + + Runnable task = new Runnable() { + @Override + public void run() { + for (int i = 0; i < 100; ++i) { + input.put("field1", "value2B_" + l.incrementAndGet()); + final Status sUpd = client.update(TABLE_NAME, key, StringByteIterator.getByteIteratorMap(input)); + + if (!sUpd.isOk()) { + updError[0] = true; + break; + } + } + } + }; + + for (int i = 0; i < 32; ++i) { + exec.execute(task); + } + + exec.awaitTermination(60, TimeUnit.SECONDS); + exec.shutdownNow(); + + assertThat(updError[0], is(false)); + } + + @Test + public void testReadAllFields() throws Exception { + cluster.cache(DEFAULT_CACHE_NAME).clear(); + final String key = "key"; + final Map<String, String> input = new HashMap<>(); + input.put("field0", "value1"); + input.put("field1", "value2A"); + input.put("field3", null); + final Status sPut = client.insert(DEFAULT_CACHE_NAME, key, StringByteIterator.getByteIteratorMap(input)); + assertThat(sPut, is(Status.OK)); + assertThat(cluster.cache(DEFAULT_CACHE_NAME).size(), is(1)); + + final Set<String> fld = new TreeSet<>(); + + final HashMap<String, ByteIterator> result1 = new HashMap<>(); + final Status sGet = client.read(TABLE_NAME, key, fld, result1); + assertThat(sGet, is(Status.OK)); + + final HashMap<String, String> strResult = new HashMap<String, String>(); + for (final Map.Entry<String, ByteIterator> e : result1.entrySet()) { + if (e.getValue() != null) { + strResult.put(e.getKey(), e.getValue().toString()); + } + } + assertThat(strResult, hasEntry("field0", "value1")); + assertThat(strResult, hasEntry("field1", "value2A")); + } + + @Test + public void testReadNotPresent() throws Exception { + cluster.cache(DEFAULT_CACHE_NAME).clear(); + final String key = "key"; + final Map<String, String> input = new HashMap<>(); + input.put("field0", "value1"); + input.put("field1", "value2A"); + input.put("field3", null); + final Status sPut = client.insert(TABLE_NAME, key, StringByteIterator.getByteIteratorMap(input)); + assertThat(sPut, is(Status.OK)); + assertThat(cluster.cache(DEFAULT_CACHE_NAME).size(), is(1)); + + final Set<String> fld = new TreeSet<>(); + + final String newKey = "newKey"; + final HashMap<String, ByteIterator> result1 = new HashMap<>(); + final Status sGet = client.read(TABLE_NAME, newKey, fld, result1); + assertThat(sGet, is(Status.NOT_FOUND)); + + } +} diff --git a/pom.xml b/pom.xml index 62dc6aea..008964d0 100644 --- a/pom.xml +++ b/pom.xml @@ -88,6 +88,7 @@ LICENSE file. <hbase14.version>1.4.2</hbase14.version> <hbase20.version>2.0.0</hbase20.version> <hypertable.version>0.9.5.6</hypertable.version> + <ignite.version>2.5.0</ignite.version> <infinispan.version>7.2.2.Final</infinispan.version> <kudu.version>1.6.0</kudu.version> <maprhbase.version>1.1.8-mapr-1710</maprhbase.version> @@ -139,6 +140,7 @@ LICENSE file. <module>hbase14</module> <module>hbase20</module> <module>hypertable</module> + <module>ignite</module> <module>infinispan</module> <module>jdbc</module> <module>kudu</module> -- GitLab