Skip to content
Snippets Groups Projects
Commit 035310c5 authored by Chris Larsen's avatar Chris Larsen Committed by Sean Busbey
Browse files

[core] Add the TimeSeriesWorkload class, a new type of workload that uses (#1008)

the existing YCSB API to generate time series data (numeric values associated
with timestamps).
See the TimeSeriesWorkload.java class Javadocs for details on how to use it
with a client and workloads/tsworkload_template for parameters.
Also add the BasicTSDB instance for printing/debugging the workloads.
parent db2e3dfb
No related branches found
No related tags found
No related merge requests found
......@@ -33,6 +33,7 @@ arangodb3:com.yahoo.ycsb.db.arangodb.ArangoDB3Client
azuredocumentdb:com.yahoo.ycsb.db.azuredocumentdb.AzureDocumentDBClient
azuretablestorage:com.yahoo.ycsb.db.azuretablestorage.AzureClient
basic:com.yahoo.ycsb.BasicDB
basicts:com.yahoo.ycsb.BasicTSDB
cassandra-cql:com.yahoo.ycsb.db.CassandraCQLClient
cassandra2-cql:com.yahoo.ycsb.db.CassandraCQLClient
cloudspanner:com.yahoo.ycsb.db.cloudspanner.CloudSpannerClient
......
......@@ -57,6 +57,7 @@ DATABASES = {
"arangodb3" : "com.yahoo.ycsb.db.arangodb.ArangoDB3Client",
"asynchbase" : "com.yahoo.ycsb.db.AsyncHBaseClient",
"basic" : "com.yahoo.ycsb.BasicDB",
"basicts" : "com.yahoo.ycsb.BasicTSDB",
"cassandra-cql": "com.yahoo.ycsb.db.CassandraCQLClient",
"cassandra2-cql": "com.yahoo.ycsb.db.CassandraCQLClient",
"cloudspanner" : "com.yahoo.ycsb.db.cloudspanner.CloudSpannerClient",
......@@ -269,8 +270,8 @@ def main():
warn("Running against a source checkout. In order to get our runtime "
"dependencies we'll have to invoke Maven. Depending on the state "
"of your system, this may take ~30-45 seconds")
db_location = "core" if binding == "basic" else binding
project = "core" if binding == "basic" else binding + "-binding"
db_location = "core" if (binding == "basic" or binding == "basicts") else binding
project = "core" if (binding == "basic" or binding == "basicts") else binding + "-binding"
db_dir = os.path.join(ycsb_home, db_location)
# goes first so we can rely on side-effect of package
maven_says = get_classpath_from_maven(project)
......
......@@ -47,16 +47,16 @@ public class BasicDB extends DB {
protected static Map<Integer, Integer> inserts;
protected static Map<Integer, Integer> deletes;
private boolean verbose;
private boolean randomizedelay;
private int todelay;
protected boolean verbose;
protected boolean randomizedelay;
protected int todelay;
protected boolean count;
public BasicDB() {
todelay = 0;
}
private void delay() {
protected void delay() {
if (todelay > 0) {
long delayNs;
if (randomizedelay) {
......
/**
* Copyright (c) 2017 YCSB contributors All rights reserved.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import com.yahoo.ycsb.workloads.TimeSeriesWorkload;
/**
* Basic DB for printing out time series workloads and/or tracking the distribution
* of keys and fields.
*/
public class BasicTSDB extends BasicDB {
/** Time series workload specific counters. */
protected static Map<Long, Integer> timestamps;
protected static Map<Integer, Integer> floats;
protected static Map<Integer, Integer> integers;
private String timestampKey;
private String valueKey;
private String tagPairDelimiter;
private String queryTimeSpanDelimiter;
private long lastTimestamp;
@Override
public void init() {
super.init();
synchronized (MUTEX) {
if (timestamps == null) {
timestamps = new HashMap<Long, Integer>();
floats = new HashMap<Integer, Integer>();
integers = new HashMap<Integer, Integer>();
}
}
timestampKey = getProperties().getProperty(
TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY,
TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT);
valueKey = getProperties().getProperty(
TimeSeriesWorkload.VALUE_KEY_PROPERTY,
TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT);
tagPairDelimiter = getProperties().getProperty(
TimeSeriesWorkload.PAIR_DELIMITER_PROPERTY,
TimeSeriesWorkload.PAIR_DELIMITER_PROPERTY_DEFAULT);
queryTimeSpanDelimiter = getProperties().getProperty(
TimeSeriesWorkload.QUERY_TIMESPAN_DELIMITER_PROPERTY,
TimeSeriesWorkload.QUERY_TIMESPAN_DELIMITER_PROPERTY_DEFAULT);
}
public Status read(String table, String key, Set<String> fields, Map<String, ByteIterator> result) {
delay();
if (verbose) {
StringBuilder sb = getStringBuilder();
sb.append("READ ").append(table).append(" ").append(key).append(" [ ");
if (fields != null) {
for (String f : fields) {
sb.append(f).append(" ");
}
} else {
sb.append("<all fields>");
}
sb.append("]");
System.out.println(sb);
}
if (count) {
Set<String> filtered = null;
if (fields != null) {
filtered = new HashSet<String>();
for (final String field : fields) {
if (field.startsWith(timestampKey)) {
String[] parts = field.split(tagPairDelimiter);
if (parts[1].contains(queryTimeSpanDelimiter)) {
parts = parts[1].split(queryTimeSpanDelimiter);
lastTimestamp = Long.parseLong(parts[0]);
} else {
lastTimestamp = Long.parseLong(parts[1]);
}
synchronized(timestamps) {
Integer ctr = timestamps.get(lastTimestamp);
if (ctr == null) {
timestamps.put(lastTimestamp, 1);
} else {
timestamps.put(lastTimestamp, ctr + 1);
}
}
} else {
filtered.add(field);
}
}
}
incCounter(reads, hash(table, key, filtered));
}
return Status.OK;
}
@Override
public Status update(String table, String key, Map<String, ByteIterator> values) {
delay();
boolean isFloat = false;
if (verbose) {
StringBuilder sb = getStringBuilder();
sb.append("UPDATE ").append(table).append(" ").append(key).append(" [ ");
if (values != null) {
final TreeMap<String, ByteIterator> tree = new TreeMap<String, ByteIterator>(values);
for (Map.Entry<String, ByteIterator> entry : tree.entrySet()) {
if (entry.getKey().equals(timestampKey)) {
sb.append(entry.getKey()).append("=")
.append(Utils.bytesToLong(entry.getValue().toArray())).append(" ");
} else if (entry.getKey().equals(valueKey)) {
final NumericByteIterator it = (NumericByteIterator) entry.getValue();
isFloat = it.isFloatingPoint();
sb.append(entry.getKey()).append("=")
.append(isFloat ? it.getDouble() : it.getLong()).append(" ");
} else {
sb.append(entry.getKey()).append("=").append(entry.getValue()).append(" ");
}
}
}
sb.append("]");
System.out.println(sb);
}
if (count) {
if (!verbose) {
isFloat = ((NumericByteIterator) values.get(valueKey)).isFloatingPoint();
}
int hash = hash(table, key, values);
incCounter(updates, hash);
synchronized(timestamps) {
Integer ctr = timestamps.get(lastTimestamp);
if (ctr == null) {
timestamps.put(lastTimestamp, 1);
} else {
timestamps.put(lastTimestamp, ctr + 1);
}
}
if (isFloat) {
incCounter(floats, hash);
} else {
incCounter(integers, hash);
}
}
return Status.OK;
}
@Override
public Status insert(String table, String key, Map<String, ByteIterator> values) {
delay();
boolean isFloat = false;
if (verbose) {
StringBuilder sb = getStringBuilder();
sb.append("INSERT ").append(table).append(" ").append(key).append(" [ ");
if (values != null) {
final TreeMap<String, ByteIterator> tree = new TreeMap<String, ByteIterator>(values);
for (Map.Entry<String, ByteIterator> entry : tree.entrySet()) {
if (entry.getKey().equals(timestampKey)) {
sb.append(entry.getKey()).append("=")
.append(Utils.bytesToLong(entry.getValue().toArray())).append(" ");
} else if (entry.getKey().equals(valueKey)) {
final NumericByteIterator it = (NumericByteIterator) entry.getValue();
isFloat = it.isFloatingPoint();
sb.append(entry.getKey()).append("=")
.append(isFloat ? it.getDouble() : it.getLong()).append(" ");
} else {
sb.append(entry.getKey()).append("=").append(entry.getValue()).append(" ");
}
}
}
sb.append("]");
System.out.println(sb);
}
if (count) {
if (!verbose) {
isFloat = ((NumericByteIterator) values.get(valueKey)).isFloatingPoint();
}
int hash = hash(table, key, values);
incCounter(inserts, hash);
synchronized(timestamps) {
Integer ctr = timestamps.get(lastTimestamp);
if (ctr == null) {
timestamps.put(lastTimestamp, 1);
} else {
timestamps.put(lastTimestamp, ctr + 1);
}
}
if (isFloat) {
incCounter(floats, hash);
} else {
incCounter(integers, hash);
}
}
return Status.OK;
}
@Override
public void cleanup() {
super.cleanup();
if (count && counter < 1) {
System.out.println("[TIMESTAMPS], Unique, " + timestamps.size());
System.out.println("[FLOATS], Unique series, " + floats.size());
System.out.println("[INTEGERS], Unique series, " + integers.size());
long minTs = Long.MAX_VALUE;
long maxTs = Long.MIN_VALUE;
for (final long ts : timestamps.keySet()) {
if (ts > maxTs) {
maxTs = ts;
}
if (ts < minTs) {
minTs = ts;
}
}
System.out.println("[TIMESTAMPS], Min, " + minTs);
System.out.println("[TIMESTAMPS], Max, " + maxTs);
}
}
@Override
protected int hash(final String table, final String key, final Map<String, ByteIterator> values) {
final TreeMap<String, ByteIterator> sorted = new TreeMap<String, ByteIterator>();
for (final Entry<String, ByteIterator> entry : values.entrySet()) {
if (entry.getKey().equals(valueKey)) {
continue;
} else if (entry.getKey().equals(timestampKey)) {
lastTimestamp = ((NumericByteIterator) entry.getValue()).getLong();
entry.getValue().reset();
continue;
}
sorted.put(entry.getKey(), entry.getValue());
}
// yeah it's ugly but gives us a unique hash without having to add hashers
// to all of the ByteIterators.
StringBuilder buf = new StringBuilder().append(table).append(key);
for (final Entry<String, ByteIterator> entry : sorted.entrySet()) {
entry.getValue().reset();
buf.append(entry.getKey())
.append(entry.getValue().toString());
}
return buf.toString().hashCode();
}
}
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
# Copyright (c) 2017 YCSB contributors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you
# may not use this file except in compliance with the License. You
# may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing
# permissions and limitations under the License. See accompanying
# LICENSE file.
# Yahoo! Cloud System Benchmark
# Time Series Workload Template: Default Values
#
# File contains all properties that can be set to define a
# YCSB session. All properties are set to their default
# value if one exists. If not, the property is commented
# out. When a property has a finite number of settings,
# the default is enabled and the alternates are shown in
# comments below it.
#
# Use of each property is explained through comments in Client.java,
# CoreWorkload.java, TimeSeriesWorkload.java or on the YCSB wiki page:
# https://github.com/brianfrankcooper/YCSB/wiki/TimeSeriesWorkload
# The name of the workload class to use. Always the following.
workload=com.yahoo.ycsb.workloads.TimeSeriesWorkload
# The default is Java's Long.MAX_VALUE.
# The number of records in the table to be inserted in
# the load phase or the number of records already in the
# table before the run phase.
recordcount=1000000
# There is no default setting for operationcount but it is
# required to be set.
# The number of operations to use during the run phase.
operationcount=3000000
# The number of insertions to do, if different from recordcount.
# Used with insertstart to grow an existing table.
#insertcount=
# ..::NOTE::.. This is different from the CoreWorkload!
# The starting timestamp of a run as a Unix Epoch numeral in the
# unit set in 'timestampunits'. This is used to determine what
# the first timestamp should be when writing or querying as well
# as how many offsets (based on 'timestampinterval').
#insertstart=
# The units represented by the 'insertstart' timestamp as well as
# durations such as 'timestampinterval', 'querytimespan', etc.
# For values, see https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/TimeUnit.html
# Note that only seconds through nanoseconds are supported.
timestampunits=SECONDS
# The amount of time between each value in every time series in
# the units of 'timestampunits'.
timestampinterval=60
# ..::NOTE::.. This is different from the CoreWorkload!
# Represents the number of unique "metrics" or "keys" for time series.
# E.g. "sys.cpu" may be a single field or "metric" while there may be many
# time series sharing that key (perhaps a host tag with "web01" and "web02"
# as options).
fieldcount=16
# The number of characters in the "metric" or "key".
fieldlength=8
# --- TODO ---?
# The distribution used to choose the length of a field
fieldlengthdistribution=constant
#fieldlengthdistribution=uniform
#fieldlengthdistribution=zipfian
# The number of unique tag combinations for each time series. E.g
# if this value is 4, each record will have a key and 4 tag combinations
# such as A=A, B=A, C=A, D=A.
tagcount=4
# The cardinality (number of unique values) of each tag value for
# every "metric" or field as a comma separated list. Each value must
# be a number from 1 to Java's Integer.MAX_VALUE and there must be
# 'tagcount' values. If there are more or fewer values than
#'tagcount' then either it is ignored or 1 is substituted respectively.
tagcardinality=1,2,4,8
# The length of each tag key in characters.
tagkeylength=8
# The length of each tag value in characters.
tagvaluelength=8
# The character separating tag keys from tag values when reads, deletes
# or scans are executed against a database. The default is the equals sign
# so a field passed in a read to a DB may look like 'AA=AB'.
tagpairdelimiter==
# The delimiter between keys and tags when a delete is passed to the DB.
# E.g. if there was a key and a field, the request key would look like:
# 'AA:AA=AB'
deletedelimiter=:
# Whether or not to randomize the timestamp order when performing inserts
# and updates against a DB. By default all writes perform with the
# timestamps moving linearly forward in time once all time series for a
# given key have been written.
randomwritetimestamporder=false
# Whether or not to randomly shuffle the time series order when writing.
# This will shuffle the keys, tag keys and tag values.
# ************************************************************************
# WARNING - When this is enabled, reads and scans will likely return many
# empty results as invalid tag combinations will be chosen. Likewise
# this setting is INCOMPATIBLE with data integrity checks.
# ************************************************************************
randomtimeseriesorder=false
# The type of numerical data generated for each data point. The values are
# 64 bit signed integers, double precision floating points or a random mix.
# For data integrity, this setting is ignored and values are switched to
# 64 bit signed ints.
#valuetype=integers
valuetype=floats
#valuetype=mixed
# A value from 0 to 0.999999 representing how sparse each time series
# should be. The higher this value, the greater the time interval between
# values in a single series. For example, if sparsity is 0 and there are
# 10 time series with a 'timestampinterval' of 60 seconds with a total
# time range of 10 intervals, you would see 100 values written, one per
# timestamp interval per time series. If the sparsity is 0.50 then there
# would be only about 50 values written so some time series would have
# missing values at each interval.
sparsity=0.00
# The percentage of time series that are "lagging" behind the current
# timestamp of the writer. This is used to mimic a common behavior where
# most sources (agents, sensors, etc) are writing data in sync (same timestamp)
# but a subset are running behind due to buffering, latency issues, etc.
delayedSeries=0.10
# The maximum amount of delay for delayed series in interval counts. The
# actual delay is chosen based on a modulo of the series index.
delayedIntervals=5
# The fixed or maximum amount of time added to the start time of a
# read or scan operation to generate a query over a range of time
# instead of a single timestamp. Units are shared with 'timestampunits'.
# For example if the value is set to 3600 seconds (1 hour) then
# each read would pick a random start timestamp based on the
#'insertstart' value and number of intervals, then add 3600 seconds
# to create the end time of the query. If this value is 0 then reads
# will only provide a single timestamp.
# WARNING: Cannot be used with 'dataintegrity'.
querytimespan=0
# Whether or not reads should choose a random time span (aligned to
# the 'timestampinterval' value) for each read or scan request starting
# at 0 and reaching 'querytimespan' as the max.
queryrandomtimespan=false
# A delimiter character used to separate the start and end timestamps
# of a read query when 'querytimespan' is enabled.
querytimespandelimiter=,
# A unique key given to read, scan and delete operations when the
# operation should perform a group-by (multi-series aggregation) on one
# or more tags. If 'groupbyfunction' is set, this key will be given with
# the configured function.
groupbykey=YCSBGB
# A function name (e.g. 'sum', 'max' or 'avg') passed during reads,
# scans and deletes to cause the database to perform a group-by
# operation on one or more tags. If this value is empty or null
# (default), group-by operations are not performed
#groupbyfunction=
# A comma separated list of 0s or 1s to denote which of the tag keys
# should be grouped during group-by operations. The number of values
# must match the number of tags in 'tagcount'.
#groupbykeys=0,0,1,1
# A unique key given to read and scan operations when the operation
# should downsample the results of a query into lower resolution
# data. If 'downsamplingfunction' is set, this key will be given with
# the configured function.
downsamplingkey=YCSBDS
# A function name (e.g. 'sum', 'max' or 'avg') passed during reads and
# scans to cause the database to perform a downsampling operation
# returning lower resolution data. If this value is empty or null
# (default), downsampling is not performed.
#downsamplingfunction=
# A time interval for which to downsample the raw data into. Shares
# the same units as 'timestampinterval'. This value must be greater
# than 'timestampinterval'. E.g. if the timestamp interval for raw
# data is 60 seconds, the downsampling interval could be 3600 seconds
# to roll up the data into 1 hour buckets.
#downsamplinginterval=
# What proportion of operations are reads
readproportion=0.10
# What proportion of operations are updates
updateproportion=0.00
# What proportion of operations are inserts
insertproportion=0.90
# The distribution of requests across the keyspace
requestdistribution=zipfian
#requestdistribution=uniform
#requestdistribution=latest
# The name of the database table to run queries against
table=usertable
# Whether or not data should be validated during writes and reads. If
# set then the data type is always a 64 bit signed integer and is the
# hash code of the key, timestamp and tags.
dataintegrity=false
# How the latency measurements are presented
measurementtype=histogram
#measurementtype=timeseries
#measurementtype=raw
# When measurementtype is set to raw, measurements will be output
# as RAW datapoints in the following csv format:
# "operation, timestamp of the measurement, latency in us"
#
# Raw datapoints are collected in-memory while the test is running. Each
# data point consumes about 50 bytes (including java object overhead).
# For a typical run of 1 million to 10 million operations, this should
# fit into memory most of the time. If you plan to do 100s of millions of
# operations per run, consider provisioning a machine with larger RAM when using
# the RAW measurement type, or split the run into multiple runs.
#
# Optionally, you can specify an output file to save raw datapoints.
# Otherwise, raw datapoints will be written to stdout.
# The output file will be appended to if it already exists, otherwise
# a new output file will be created.
#measurement.raw.output_file = /tmp/your_output_file_for_this_run
# JVM Reporting.
#
# Measure JVM information over time including GC counts, max and min memory
# used, max and min thread counts, max and min system load and others. This
# setting must be enabled in conjunction with the "-s" flag to run the status
# thread. Every "status.interval", the status thread will capture JVM
# statistics and record the results. At the end of the run, max and mins will
# be recorded.
# measurement.trackjvm = false
# The range of latencies to track in the histogram (milliseconds)
histogram.buckets=1000
# Granularity for time series (in milliseconds)
timeseries.granularity=1000
# Latency reporting.
#
# YCSB records latency of failed operations separately from successful ones.
# Latency of all OK operations will be reported under their operation name,
# such as [READ], [UPDATE], etc.
#
# For failed operations:
# By default we don't track latency numbers of specific error status.
# We just report latency of all failed operation under one measurement name
# such as [READ-FAILED]. But optionally, user can configure to have either:
# 1. Record and report latency for each and every error status code by
# setting reportLatencyForEachError to true, or
# 2. Record and report latency for a select set of error status codes by
# providing a CSV list of Status codes via the "latencytrackederrors"
# property.
# reportlatencyforeacherror=false
# latencytrackederrors="<comma separated strings of error codes>"
# Copyright (c) 2017 YCSB contributors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you
# may not use this file except in compliance with the License. You
# may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing
# permissions and limitations under the License. See accompanying
# LICENSE file.
# Yahoo! Cloud System Benchmark
# Workload A: Small cardinality consistent data for 2 days
# Application example: Typical monitoring of a single compute or small
# sensor station where 90% of the load is write and only 10% is read
# (it's usually much less). All writes are inserts. No sparsity so
# every series will have a value at every timestamp.
#
# Read/insert ratio: 10/90
# Cardinality: 16 per key (field), 64 fields for a total of 1,024
# time series.
workload=com.yahoo.ycsb.workloads.TimeSeriesWorkload
recordcount=1474560
operationcount=2949120
fieldlength=8
fieldcount=64
tagcount=4
tagcardinality=1,2,4,2
sparsity=0.0
delayedSeries=0.0
delayedIntervals=0
timestampunits=SECONDS
timestampinterval=60
querytimespan=3600
readproportion=0.10
updateproportion=0.00
insertproportion=0.90
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment