diff --git a/.travis.yml b/.travis.yml index 0726f500878634d24d6a85532c6f29814ad03565..84eeb0d0ec69e7d48b0b2503f5b8675009288aa8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -22,7 +22,6 @@ language: java jdk: - oraclejdk9 - oraclejdk8 - - openjdk7 addons: hosts: diff --git a/checkstyle.xml b/checkstyle.xml index 6041ba4b71462808903a4225feccddce53c34afe..de1166cd38d73594f4d4e5297df84e88fc324645 100644 --- a/checkstyle.xml +++ b/checkstyle.xml @@ -72,6 +72,8 @@ LICENSE file. <module name="JavadocType"> <property name="scope" value="public"/> <property name="allowMissingParamTags" value="true"/> + <!-- unfortunately we cannot add implNote, implSpec, apiNote and apiSpec to checkstyle --> + <property name="allowUnknownTags" value="true"/> </module> <module name="JavadocStyle"/> diff --git a/core/src/main/java/com/yahoo/ycsb/TimeseriesDB.java b/core/src/main/java/com/yahoo/ycsb/TimeseriesDB.java new file mode 100644 index 0000000000000000000000000000000000000000..10f217606c7222799822665edd1bce34edfa0e63 --- /dev/null +++ b/core/src/main/java/com/yahoo/ycsb/TimeseriesDB.java @@ -0,0 +1,336 @@ +/* + * Copyright (c) 2018 YCSB Contributors All rights reserved. + * <p> + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ +package com.yahoo.ycsb; + +import com.yahoo.ycsb.generator.Generator; +import com.yahoo.ycsb.generator.IncrementingPrintableStringGenerator; +import com.yahoo.ycsb.workloads.TimeSeriesWorkload; + +import java.util.*; +import java.util.concurrent.TimeUnit; + +/** + * Abstract class to adapt the default ycsb DB interface to Timeseries databases. + * This class is mostly here to be extended by Timeseries dataabases + * originally developed by Andreas Bader in <a href="https://github.com/TSDBBench/YCSB-TS">YCSB-TS</a>. + * <p> + * This class is mostly parsing the workload information passed through the default ycsb interface + * according to the information outlined in {@link TimeSeriesWorkload}. + * It also contains some minor utility methods relevant to Timeseries databases. + * </p> + * + * @implSpec It's vital to call <tt>super.init()</tt> when overwriting the init method + * to correctly initialize the workload-parsing. + */ +public abstract class TimeseriesDB extends DB { + + // defaults for downsampling. Basically we ignore it + private static final String DOWNSAMPLING_FUNCTION_PROPERTY_DEFAULT = "NONE"; + private static final String DOWNSAMPLING_INTERVAL_PROPERTY_DEFAULT = "0"; + + // debug property loading + private static final String DEBUG_PROPERTY = "debug"; + private static final String DEBUG_PROPERTY_DEFAULT = "false"; + + // test property loading + private static final String TEST_PROPERTY = "test"; + private static final String TEST_PROPERTY_DEFAULT = "false"; + + // Workload parameters that we need to parse this + protected String timestampKey; + protected String valueKey; + protected String tagPairDelimiter; + protected String queryTimeSpanDelimiter; + protected String deleteDelimiter; + protected TimeUnit timestampUnit; + protected String groupByKey; + protected String downsamplingKey; + protected Integer downsamplingInterval; + protected AggregationOperation downsamplingFunction; + + // YCSB-parameters + protected boolean debug; + protected boolean test; + + /** + * Initialize any state for this DB. + * Called once per DB instance; there is one DB instance per client thread. + */ + @Override + public void init() throws DBException { + // taken from BasicTSDB + timestampKey = getProperties().getProperty( + TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY, + TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT); + valueKey = getProperties().getProperty( + TimeSeriesWorkload.VALUE_KEY_PROPERTY, + TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT); + tagPairDelimiter = getProperties().getProperty( + TimeSeriesWorkload.PAIR_DELIMITER_PROPERTY, + TimeSeriesWorkload.PAIR_DELIMITER_PROPERTY_DEFAULT); + queryTimeSpanDelimiter = getProperties().getProperty( + TimeSeriesWorkload.QUERY_TIMESPAN_DELIMITER_PROPERTY, + TimeSeriesWorkload.QUERY_TIMESPAN_DELIMITER_PROPERTY_DEFAULT); + deleteDelimiter = getProperties().getProperty( + TimeSeriesWorkload.DELETE_DELIMITER_PROPERTY, + TimeSeriesWorkload.DELETE_DELIMITER_PROPERTY_DEFAULT); + timestampUnit = TimeUnit.valueOf(getProperties().getProperty( + TimeSeriesWorkload.TIMESTAMP_UNITS_PROPERTY, + TimeSeriesWorkload.TIMESTAMP_UNITS_PROPERTY_DEFAULT)); + groupByKey = getProperties().getProperty( + TimeSeriesWorkload.GROUPBY_KEY_PROPERTY, + TimeSeriesWorkload.GROUPBY_KEY_PROPERTY_DEFAULT); + downsamplingKey = getProperties().getProperty( + TimeSeriesWorkload.DOWNSAMPLING_KEY_PROPERTY, + TimeSeriesWorkload.DOWNSAMPLING_KEY_PROPERTY_DEFAULT); + downsamplingFunction = TimeseriesDB.AggregationOperation.valueOf(getProperties() + .getProperty(TimeSeriesWorkload.DOWNSAMPLING_FUNCTION_PROPERTY, DOWNSAMPLING_FUNCTION_PROPERTY_DEFAULT)); + downsamplingInterval = Integer.valueOf(getProperties() + .getProperty(TimeSeriesWorkload.DOWNSAMPLING_INTERVAL_PROPERTY, DOWNSAMPLING_INTERVAL_PROPERTY_DEFAULT)); + + test = Boolean.parseBoolean(getProperties().getProperty(TEST_PROPERTY, TEST_PROPERTY_DEFAULT)); + debug = Boolean.parseBoolean(getProperties().getProperty(DEBUG_PROPERTY, DEBUG_PROPERTY_DEFAULT)); + } + + @Override + public final Status read(String table, String key, Set<String> fields, Map<String, ByteIterator> result) { + Map<String, List<String>> tagQueries = new HashMap<>(); + Long timestamp = null; + for (String field : fields) { + if (field.startsWith(timestampKey)) { + String[] timestampParts = field.split(tagPairDelimiter); + if (timestampParts[1].contains(queryTimeSpanDelimiter)) { + // Since we're looking for a single datapoint, a range of timestamps makes no sense. + // As we cannot throw an exception to bail out here, we return `BAD_REQUEST` instead. + return Status.BAD_REQUEST; + } + timestamp = Long.valueOf(timestampParts[1]); + } else { + String[] queryParts = field.split(tagPairDelimiter); + tagQueries.computeIfAbsent(queryParts[0], k -> new ArrayList<>()).add(queryParts[1]); + } + } + if (timestamp == null) { + return Status.BAD_REQUEST; + } + + return read(table, timestamp, tagQueries); + } + + /** + * Read a record from the database. Each value from the result will be stored in a HashMap + * + * @param metric The name of the metric + * @param timestamp The timestamp of the record to read. + * @param tags actual tags that were want to receive (can be empty) + * @return Zero on success, a non-zero error code on error or "not found". + */ + protected abstract Status read(String metric, long timestamp, Map<String, List<String>> tags); + + /** + * @inheritDoc + * @implNote this method parses the information passed to it and subsequently passes it to the modified + * interface at {@link #scan(String, long, long, Map, AggregationOperation, int, TimeUnit)} + */ + @Override + public final Status scan(String table, String startkey, int recordcount, Set<String> fields, + Vector<HashMap<String, ByteIterator>> result) { + Map<String, List<String>> tagQueries = new HashMap<>(); + TimeseriesDB.AggregationOperation aggregationOperation = TimeseriesDB.AggregationOperation.NONE; + Set<String> groupByFields = new HashSet<>(); + + boolean rangeSet = false; + long start = 0; + long end = 0; + for (String field : fields) { + if (field.startsWith(timestampKey)) { + String[] timestampParts = field.split(tagPairDelimiter); + if (!timestampParts[1].contains(queryTimeSpanDelimiter)) { + // seems like this should be a more elaborate query. + // for now we don't support scanning single timestamps + // TODO: Support Timestamp range queries + return Status.NOT_IMPLEMENTED; + } + String[] rangeParts = timestampParts[1].split(queryTimeSpanDelimiter); + rangeSet = true; + start = Long.valueOf(rangeParts[0]); + end = Long.valueOf(rangeParts[1]); + } else if (field.startsWith(groupByKey)) { + String groupBySpecifier = field.split(tagPairDelimiter)[1]; + aggregationOperation = TimeseriesDB.AggregationOperation.valueOf(groupBySpecifier); + } else if (field.startsWith(downsamplingKey)) { + String downsamplingSpec = field.split(tagPairDelimiter)[1]; + // apparently that needs to always hold true: + if (!downsamplingSpec.equals(downsamplingFunction.toString() + downsamplingInterval.toString())) { + System.err.print("Downsampling specification for Scan did not match configured downsampling"); + return Status.BAD_REQUEST; + } + } else { + String[] queryParts = field.split(tagPairDelimiter); + if (queryParts.length == 1) { + // we should probably warn about this being ignored... + System.err.println("Grouping by arbitrary series is currently not supported"); + groupByFields.add(field); + } else { + tagQueries.computeIfAbsent(queryParts[0], k -> new ArrayList<>()).add(queryParts[1]); + } + } + } + if (!rangeSet) { + return Status.BAD_REQUEST; + } + return scan(table, start, end, tagQueries, downsamplingFunction, downsamplingInterval, timestampUnit); + } + + /** + * Perform a range scan for a set of records in the database. Each value from the result will be stored in a + * HashMap. + * + * @param metric The name of the metric + * @param startTs The timestamp of the first record to read. + * @param endTs The timestamp of the last record to read. + * @param tags actual tags that were want to receive (can be empty). + * @param aggreg The aggregation operation to perform. + * @param timeValue value for timeUnit for aggregation + * @param timeUnit timeUnit for aggregation + * @return A {@link Status} detailing the outcome of the scan operation. + */ + protected abstract Status scan(String metric, long startTs, long endTs, Map<String, List<String>> tags, + AggregationOperation aggreg, int timeValue, TimeUnit timeUnit); + + @Override + public Status update(String table, String key, Map<String, ByteIterator> values) { + return Status.NOT_IMPLEMENTED; + // not supportable for general TSDBs + // can be explicitly overwritten in inheriting classes + } + + @Override + public final Status insert(String table, String key, Map<String, ByteIterator> values) { + NumericByteIterator tsContainer = (NumericByteIterator) values.remove(timestampKey); + NumericByteIterator valueContainer = (NumericByteIterator) values.remove(valueKey); + if (valueContainer.isFloatingPoint()) { + return insert(table, tsContainer.getLong(), valueContainer.getDouble(), values); + } else { + return insert(table, tsContainer.getLong(), valueContainer.getLong(), values); + } + } + + /** + * Insert a record into the database. Any tags/tagvalue pairs in the specified tagmap and the given value will be + * written into the record with the specified timestamp. + * + * @param metric The name of the metric + * @param timestamp The timestamp of the record to insert. + * @param value The actual value to insert. + * @param tags A Map of tag/tagvalue pairs to insert as tags + * @return A {@link Status} detailing the outcome of the insert + */ + protected abstract Status insert(String metric, long timestamp, long value, Map<String, ByteIterator> tags); + + /** + * Insert a record in the database. Any tags/tagvalue pairs in the specified tagmap and the given value will be + * written into the record with the specified timestamp. + * + * @param metric The name of the metric + * @param timestamp The timestamp of the record to insert. + * @param value actual value to insert + * @param tags A HashMap of tag/tagvalue pairs to insert as tags + * @return A {@link Status} detailing the outcome of the insert + */ + protected abstract Status insert(String metric, long timestamp, double value, Map<String, ByteIterator> tags); + + /** + * NOTE: This operation is usually <b>not</b> supported for Time-Series databases. + * Deletion of data is often instead regulated through automatic cleanup and "retention policies" or similar. + * + * @return Status.NOT_IMPLEMENTED or a {@link Status} specifying the outcome of deletion + * in case the operation is supported. + */ + public Status delete(String table, String key) { + return Status.NOT_IMPLEMENTED; + } + + /** + * Examines the given {@link Properties} and returns an array containing the Tag Keys + * (basically matching column names for traditional Relational DBs) that are detailed in the workload specification. + * See {@link TimeSeriesWorkload} for how these are generated. + * <p> + * This method is intended to be called during the initialization phase to create a table schema + * for DBMS that require such a schema before values can be inserted (or queried) + * + * @param properties The properties detailing the workload configuration. + * @return An array of strings specifying all allowed TagKeys (or column names) + * except for the "value" and the "timestamp" column name. + * @implSpec WARNING this method must exactly match how tagKeys are generated by the {@link TimeSeriesWorkload}, + * otherwise databases requiring this information will most likely break! + */ + protected static String[] getPossibleTagKeys(Properties properties) { + final int tagCount = Integer.parseInt(properties.getProperty(TimeSeriesWorkload.TAG_COUNT_PROPERTY, + TimeSeriesWorkload.TAG_COUNT_PROPERTY_DEFAULT)); + final int tagKeylength = Integer.parseInt(properties.getProperty(TimeSeriesWorkload.TAG_KEY_LENGTH_PROPERTY, + TimeSeriesWorkload.TAG_KEY_LENGTH_PROPERTY_DEFAULT)); + + Generator<String> tagKeyGenerator = new IncrementingPrintableStringGenerator(tagKeylength); + String[] tagNames = new String[tagCount]; + for (int i = 0; i < tagCount; i++) { + tagNames[i] = tagKeyGenerator.nextValue(); + } + return tagNames; + } + + + /** + * An enum containing the possible aggregation operations. + * Not all of these operations are required to be supported by implementing classes. + * <p> + * Aggregations are applied when using the <tt>SCAN</tt> operation on a range of timestamps. + * That way the result set is reduced from multiple records into + * a single one or one record for each group specified through <tt>GROUP BY</tt> clauses. + */ + public enum AggregationOperation { + /** + * No aggregation whatsoever. Return the results as a full table + */ + NONE, + /** + * Sum the values of the matching records when calculating the value. + * GroupBy criteria apply where relevant for sub-summing. + */ + SUM, + /** + * Calculate the arithmetic mean over the value across matching records when calculating the value. + * GroupBy criteria apply where relevant for group-targeted averages + */ + AVERAGE, + /** + * Count the number of matching records and return that as value. + * GroupBy criteria apply where relevant. + */ + COUNT, + /** + * Return only the maximum of the matching record values. + * GroupBy criteria apply and result in group-based maxima. + */ + MAX, + /** + * Return only the minimum of the matching record values. + * GroupBy criteria apply and result in group-based minima. + */ + MIN; + } +} diff --git a/pom.xml b/pom.xml index 6e7629df830360fb888a0da5c29c4cbb92b8c1c6..f9b24b535f00da5c773a0ea21b5a0c666a1d823c 100644 --- a/pom.xml +++ b/pom.xml @@ -1,6 +1,6 @@ <?xml version="1.0" encoding="UTF-8"?> -<!-- -Copyright (c) 2012 - 2016 YCSB contributors. All rights reserved. +<!-- +Copyright (c) 2012 - 2017 YCSB contributors. All rights reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You @@ -68,42 +68,44 @@ LICENSE file. <properties> <maven.assembly.version>2.5.5</maven.assembly.version> <maven.dependency.version>2.10</maven.dependency.version> - <asynchbase.version>1.7.1</asynchbase.version> - <hbase098.version>0.98.14-hadoop2</hbase098.version> - <hbase10.version>1.0.2</hbase10.version> - <hbase12.version>1.2.5</hbase12.version> + + <!-- datastore binding versions, lex sorted --> <accumulo.1.6.version>1.6.6</accumulo.1.6.version> <accumulo.1.7.version>1.7.3</accumulo.1.7.version> <accumulo.1.8.version>1.8.1</accumulo.1.8.version> + <aerospike.version>3.1.2</aerospike.version> + <arangodb.version>2.7.3</arangodb.version> + <arangodb3.version>4.1.7</arangodb3.version> + <asynchbase.version>1.7.1</asynchbase.version> + <azuredocumentdb.version>1.8.1</azuredocumentdb.version> + <azurestorage.version>4.0.0</azurestorage.version> <cassandra.cql.version>3.0.0</cassandra.cql.version> + <cloudspanner.version>0.24.0-beta</cloudspanner.version> + <couchbase.version>1.4.10</couchbase.version> + <couchbase2.version>2.3.1</couchbase2.version> + <elasticsearch5-version>5.5.1</elasticsearch5-version> <geode.version>1.2.0</geode.version> - <azuredocumentdb.version>1.8.1</azuredocumentdb.version> <googlebigtable.version>0.9.7</googlebigtable.version> + <hbase098.version>0.98.14-hadoop2</hbase098.version> + <hbase10.version>1.0.2</hbase10.version> + <hbase12.version>1.2.5</hbase12.version> + <hypertable.version>0.9.5.6</hypertable.version> <infinispan.version>7.2.2.Final</infinispan.version> <kudu.version>1.1.0</kudu.version> - <openjpa.jdbc.version>2.1.1</openjpa.jdbc.version> <!--<mapkeeper.version>1.0</mapkeeper.version>--> <mongodb.version>3.0.3</mongodb.version> <mongodb.async.version>2.0.1</mongodb.async.version> + <openjpa.jdbc.version>2.1.1</openjpa.jdbc.version> <orientdb.version>2.2.10</orientdb.version> - <redis.version>2.0.0</redis.version> - <s3.version>1.10.20</s3.version> - <voldemort.version>0.81</voldemort.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <thrift.version>0.8.0</thrift.version> - <hypertable.version>0.9.5.6</hypertable.version> - <elasticsearch5-version>5.5.1</elasticsearch5-version> - <couchbase.version>1.4.10</couchbase.version> - <couchbase2.version>2.3.1</couchbase2.version> - <tarantool.version>1.6.5</tarantool.version> + <redis.version>2.0.0</redis.version> <riak.version>2.0.5</riak.version> - <aerospike.version>3.1.2</aerospike.version> + <s3.version>1.10.20</s3.version> <solr.version>5.5.3</solr.version> <solr6.version>6.4.1</solr6.version> - <arangodb.version>2.7.3</arangodb.version> - <arangodb3.version>4.1.7</arangodb3.version> - <azurestorage.version>4.0.0</azurestorage.version> - <cloudspanner.version>0.24.0-beta</cloudspanner.version> + <tarantool.version>1.6.5</tarantool.version> + <thrift.version>0.8.0</thrift.version> + <voldemort.version>0.81</voldemort.version> </properties> <modules> @@ -180,7 +182,7 @@ LICENSE file. <requireMavenVersion> <version>3.1.0</version> </requireMavenVersion> - </rules> + </rules> </configuration> </execution> </executions> @@ -190,8 +192,8 @@ LICENSE file. <artifactId>maven-compiler-plugin</artifactId> <version>3.7.0</version> <configuration> - <source>1.7</source> - <target>1.7</target> + <source>1.8</source> + <target>1.8</target> </configuration> </plugin> <plugin>