diff --git a/bin/bindings.properties b/bin/bindings.properties
index b37973905920a0be90734c5d1bc57c497d8f9b34..1a21407f628072050715f259660dcdcc20c02133 100644
--- a/bin/bindings.properties
+++ b/bin/bindings.properties
@@ -33,6 +33,7 @@ arangodb3:com.yahoo.ycsb.db.arangodb.ArangoDB3Client
 azuredocumentdb:com.yahoo.ycsb.db.azuredocumentdb.AzureDocumentDBClient
 azuretablestorage:com.yahoo.ycsb.db.azuretablestorage.AzureClient
 basic:com.yahoo.ycsb.BasicDB
+basicts:com.yahoo.ycsb.BasicTSDB
 cassandra-cql:com.yahoo.ycsb.db.CassandraCQLClient
 cassandra2-cql:com.yahoo.ycsb.db.CassandraCQLClient
 cloudspanner:com.yahoo.ycsb.db.cloudspanner.CloudSpannerClient
diff --git a/bin/ycsb b/bin/ycsb
index 7fb7518024d9fdfff321c72e0debc550a853fcb3..59ad00dd0f74ff9ab6d014a38b8453e8907607d6 100755
--- a/bin/ycsb
+++ b/bin/ycsb
@@ -57,6 +57,7 @@ DATABASES = {
     "arangodb3"    : "com.yahoo.ycsb.db.arangodb.ArangoDB3Client",
     "asynchbase"   : "com.yahoo.ycsb.db.AsyncHBaseClient",
     "basic"        : "com.yahoo.ycsb.BasicDB",
+    "basicts"      : "com.yahoo.ycsb.BasicTSDB",
     "cassandra-cql": "com.yahoo.ycsb.db.CassandraCQLClient",
     "cassandra2-cql": "com.yahoo.ycsb.db.CassandraCQLClient",
     "cloudspanner" : "com.yahoo.ycsb.db.cloudspanner.CloudSpannerClient",
@@ -269,8 +270,8 @@ def main():
         warn("Running against a source checkout. In order to get our runtime "
              "dependencies we'll have to invoke Maven. Depending on the state "
              "of your system, this may take ~30-45 seconds")
-        db_location = "core" if binding == "basic" else binding
-        project = "core" if binding == "basic" else binding + "-binding"
+        db_location = "core" if (binding == "basic" or binding == "basicts") else binding
+        project = "core" if (binding == "basic" or binding == "basicts") else binding + "-binding"
         db_dir = os.path.join(ycsb_home, db_location)
         # goes first so we can rely on side-effect of package
         maven_says = get_classpath_from_maven(project)
diff --git a/core/src/main/java/com/yahoo/ycsb/BasicDB.java b/core/src/main/java/com/yahoo/ycsb/BasicDB.java
index 9e483f21477bfa8e66048c1de3810b11722f525f..15d1101053c27ffd7228c21ff881fb44a77230de 100644
--- a/core/src/main/java/com/yahoo/ycsb/BasicDB.java
+++ b/core/src/main/java/com/yahoo/ycsb/BasicDB.java
@@ -47,16 +47,16 @@ public class BasicDB extends DB {
   protected static Map<Integer, Integer> inserts;
   protected static Map<Integer, Integer> deletes;
   
-  private boolean verbose;
-  private boolean randomizedelay;
-  private int todelay;
+  protected boolean verbose;
+  protected boolean randomizedelay;
+  protected int todelay;
   protected boolean count;
 
   public BasicDB() {
     todelay = 0;
   }
 
-  private void delay() {
+  protected void delay() {
     if (todelay > 0) {
       long delayNs;
       if (randomizedelay) {
diff --git a/core/src/main/java/com/yahoo/ycsb/BasicTSDB.java b/core/src/main/java/com/yahoo/ycsb/BasicTSDB.java
new file mode 100644
index 0000000000000000000000000000000000000000..42117ea9e9be50bdf8e3493d5fb8a73b54941087
--- /dev/null
+++ b/core/src/main/java/com/yahoo/ycsb/BasicTSDB.java
@@ -0,0 +1,273 @@
+/**
+ * Copyright (c) 2017 YCSB contributors All rights reserved.
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+package com.yahoo.ycsb;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+
+import com.yahoo.ycsb.workloads.TimeSeriesWorkload;
+
+/**
+ * Basic DB for printing out time series workloads and/or tracking the distribution
+ * of keys and fields.
+ */
+public class BasicTSDB extends BasicDB {
+
+  /** Time series workload specific counters. */
+  protected static Map<Long, Integer> timestamps;
+  protected static Map<Integer, Integer> floats;
+  protected static Map<Integer, Integer> integers;
+  
+  private String timestampKey;
+  private String valueKey;
+  private String tagPairDelimiter;
+  private String queryTimeSpanDelimiter;
+  private long lastTimestamp;
+  
+  @Override
+  public void init() {
+    super.init();
+    
+    synchronized (MUTEX) {
+      if (timestamps == null) {
+        timestamps = new HashMap<Long, Integer>();
+        floats = new HashMap<Integer, Integer>();
+        integers = new HashMap<Integer, Integer>();
+      }
+    }
+    
+    timestampKey = getProperties().getProperty(
+        TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY, 
+        TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT);
+    valueKey = getProperties().getProperty(
+        TimeSeriesWorkload.VALUE_KEY_PROPERTY, 
+        TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT);
+    tagPairDelimiter = getProperties().getProperty(
+        TimeSeriesWorkload.PAIR_DELIMITER_PROPERTY, 
+        TimeSeriesWorkload.PAIR_DELIMITER_PROPERTY_DEFAULT);
+    queryTimeSpanDelimiter = getProperties().getProperty(
+        TimeSeriesWorkload.QUERY_TIMESPAN_DELIMITER_PROPERTY,
+        TimeSeriesWorkload.QUERY_TIMESPAN_DELIMITER_PROPERTY_DEFAULT);
+  }
+  
+  public Status read(String table, String key, Set<String> fields, Map<String, ByteIterator> result) {
+    delay();
+    
+    if (verbose) {
+      StringBuilder sb = getStringBuilder();
+      sb.append("READ ").append(table).append(" ").append(key).append(" [ ");
+      if (fields != null) {
+        for (String f : fields) {
+          sb.append(f).append(" ");
+        }
+      } else {
+        sb.append("<all fields>");
+      }
+
+      sb.append("]");
+      System.out.println(sb);
+    }
+
+    if (count) {
+      Set<String> filtered = null;
+      if (fields != null) {
+        filtered = new HashSet<String>();
+        for (final String field : fields) {
+          if (field.startsWith(timestampKey)) {
+            String[] parts = field.split(tagPairDelimiter);
+            if (parts[1].contains(queryTimeSpanDelimiter)) {
+              parts = parts[1].split(queryTimeSpanDelimiter);
+              lastTimestamp = Long.parseLong(parts[0]);
+            } else {
+              lastTimestamp = Long.parseLong(parts[1]);
+            }
+            synchronized(timestamps) {
+              Integer ctr = timestamps.get(lastTimestamp);
+              if (ctr == null) {
+                timestamps.put(lastTimestamp, 1);
+              } else {
+                timestamps.put(lastTimestamp, ctr + 1);
+              }
+            }
+          } else {
+            filtered.add(field);
+          }
+        }
+      }
+      incCounter(reads, hash(table, key, filtered));
+    }
+    return Status.OK;
+  }
+  
+  @Override
+  public Status update(String table, String key, Map<String, ByteIterator> values) {
+    delay();
+
+    boolean isFloat = false;
+    
+    if (verbose) {
+      StringBuilder sb = getStringBuilder();
+      sb.append("UPDATE ").append(table).append(" ").append(key).append(" [ ");
+      if (values != null) {
+        final TreeMap<String, ByteIterator> tree = new TreeMap<String, ByteIterator>(values);
+        for (Map.Entry<String, ByteIterator> entry : tree.entrySet()) {
+          if (entry.getKey().equals(timestampKey)) {
+            sb.append(entry.getKey()).append("=")
+              .append(Utils.bytesToLong(entry.getValue().toArray())).append(" ");
+          } else if (entry.getKey().equals(valueKey)) {
+            final NumericByteIterator it = (NumericByteIterator) entry.getValue();
+            isFloat = it.isFloatingPoint();
+            sb.append(entry.getKey()).append("=")
+               .append(isFloat ? it.getDouble() : it.getLong()).append(" ");
+          } else {
+            sb.append(entry.getKey()).append("=").append(entry.getValue()).append(" ");
+          }
+        }
+      }
+      sb.append("]");
+      System.out.println(sb);
+    }
+
+    if (count) {
+      if (!verbose) {
+        isFloat = ((NumericByteIterator) values.get(valueKey)).isFloatingPoint();
+      }
+      int hash = hash(table, key, values);
+      incCounter(updates, hash);
+      synchronized(timestamps) {
+        Integer ctr = timestamps.get(lastTimestamp);
+        if (ctr == null) {
+          timestamps.put(lastTimestamp, 1);
+        } else {
+          timestamps.put(lastTimestamp, ctr + 1);
+        }
+      }
+      if (isFloat) {
+        incCounter(floats, hash);
+      } else {
+        incCounter(integers, hash);
+      }
+    }
+    
+    return Status.OK;
+  }
+
+  @Override
+  public Status insert(String table, String key, Map<String, ByteIterator> values) {
+    delay();
+    
+    boolean isFloat = false;
+    
+    if (verbose) {
+      StringBuilder sb = getStringBuilder();
+      sb.append("INSERT ").append(table).append(" ").append(key).append(" [ ");
+      if (values != null) {
+        final TreeMap<String, ByteIterator> tree = new TreeMap<String, ByteIterator>(values);
+        for (Map.Entry<String, ByteIterator> entry : tree.entrySet()) {
+          if (entry.getKey().equals(timestampKey)) {
+            sb.append(entry.getKey()).append("=")
+              .append(Utils.bytesToLong(entry.getValue().toArray())).append(" ");
+          } else if (entry.getKey().equals(valueKey)) {
+            final NumericByteIterator it = (NumericByteIterator) entry.getValue();
+            isFloat = it.isFloatingPoint();
+            sb.append(entry.getKey()).append("=")
+              .append(isFloat ? it.getDouble() : it.getLong()).append(" ");
+          } else {
+            sb.append(entry.getKey()).append("=").append(entry.getValue()).append(" ");
+          }
+        }
+      }
+      sb.append("]");
+      System.out.println(sb);
+    }
+
+    if (count) {
+      if (!verbose) {
+        isFloat = ((NumericByteIterator) values.get(valueKey)).isFloatingPoint();
+      }
+      int hash = hash(table, key, values);
+      incCounter(inserts, hash);
+      synchronized(timestamps) {
+        Integer ctr = timestamps.get(lastTimestamp);
+        if (ctr == null) {
+          timestamps.put(lastTimestamp, 1);
+        } else {
+          timestamps.put(lastTimestamp, ctr + 1);
+        }
+      }
+      if (isFloat) {
+        incCounter(floats, hash);
+      } else {
+        incCounter(integers, hash);
+      }
+    }
+
+    return Status.OK;
+  }
+
+  @Override
+  public void cleanup() {
+    super.cleanup();
+    if (count && counter < 1) {
+      System.out.println("[TIMESTAMPS], Unique, " + timestamps.size());
+      System.out.println("[FLOATS], Unique series, " + floats.size());
+      System.out.println("[INTEGERS], Unique series, " + integers.size());
+      
+      long minTs = Long.MAX_VALUE;
+      long maxTs = Long.MIN_VALUE;
+      for (final long ts : timestamps.keySet()) {
+        if (ts > maxTs) {
+          maxTs = ts;
+        }
+        if (ts < minTs) {
+          minTs = ts;
+        }
+      }
+      System.out.println("[TIMESTAMPS], Min, " + minTs);
+      System.out.println("[TIMESTAMPS], Max, " + maxTs);
+    }
+  }
+  
+  @Override
+  protected int hash(final String table, final String key, final Map<String, ByteIterator> values) {
+    final TreeMap<String, ByteIterator> sorted = new TreeMap<String, ByteIterator>();
+    for (final Entry<String, ByteIterator> entry : values.entrySet()) {
+      if (entry.getKey().equals(valueKey)) {
+        continue;
+      } else if (entry.getKey().equals(timestampKey)) {
+        lastTimestamp = ((NumericByteIterator) entry.getValue()).getLong();
+        entry.getValue().reset();
+        continue;
+      }
+      sorted.put(entry.getKey(), entry.getValue());
+    }
+    // yeah it's ugly but gives us a unique hash without having to add hashers
+    // to all of the ByteIterators.
+    StringBuilder buf = new StringBuilder().append(table).append(key);
+    for (final Entry<String, ByteIterator> entry : sorted.entrySet()) {
+      entry.getValue().reset();
+      buf.append(entry.getKey())
+         .append(entry.getValue().toString());
+    }
+    return buf.toString().hashCode();
+  }
+  
+}
\ No newline at end of file
diff --git a/core/src/main/java/com/yahoo/ycsb/workloads/TimeSeriesWorkload.java b/core/src/main/java/com/yahoo/ycsb/workloads/TimeSeriesWorkload.java
new file mode 100644
index 0000000000000000000000000000000000000000..3a637eebd149e52b4e3935ec73accae8eacd0a59
--- /dev/null
+++ b/core/src/main/java/com/yahoo/ycsb/workloads/TimeSeriesWorkload.java
@@ -0,0 +1,1286 @@
+/**
+ * Copyright (c) 2017 YCSB contributors All rights reserved.
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+package com.yahoo.ycsb.workloads;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.Vector;
+import java.util.concurrent.TimeUnit;
+
+import com.yahoo.ycsb.ByteIterator;
+import com.yahoo.ycsb.Client;
+import com.yahoo.ycsb.DB;
+import com.yahoo.ycsb.NumericByteIterator;
+import com.yahoo.ycsb.Status;
+import com.yahoo.ycsb.StringByteIterator;
+import com.yahoo.ycsb.Utils;
+import com.yahoo.ycsb.Workload;
+import com.yahoo.ycsb.WorkloadException;
+import com.yahoo.ycsb.generator.DiscreteGenerator;
+import com.yahoo.ycsb.generator.Generator;
+import com.yahoo.ycsb.generator.HotspotIntegerGenerator;
+import com.yahoo.ycsb.generator.IncrementingPrintableStringGenerator;
+import com.yahoo.ycsb.generator.NumberGenerator;
+import com.yahoo.ycsb.generator.RandomDiscreteTimestampGenerator;
+import com.yahoo.ycsb.generator.ScrambledZipfianGenerator;
+import com.yahoo.ycsb.generator.SequentialGenerator;
+import com.yahoo.ycsb.generator.UniformLongGenerator;
+import com.yahoo.ycsb.generator.UnixEpochTimestampGenerator;
+import com.yahoo.ycsb.generator.ZipfianGenerator;
+import com.yahoo.ycsb.measurements.Measurements;
+
+/**
+ * A specialized workload dealing with time series data, i.e. series of discreet
+ * events associated with timestamps and identifiers. For this workload, identities
+ * consist of a {@link String} <b>key</b> and a set of {@link String} <b>tag key/value</b>
+ * pairs. 
+ * <p>
+ * For example:
+ * <table border="1">
+ * <tr><th>Time Series Key</th><th>Tag Keys/Values</th><th>1483228800</th><th>1483228860</th><th>1483228920</th></tr>
+ * <tr><td>AA</td><td>AA=AA, AB=AA</td><td>42.5</td><td>1.0</td><td>85.9</td></tr>
+ * <tr><td>AA</td><td>AA=AA, AB=AB</td><td>-9.4</td><td>76.9</td><td>0.18</td></tr>
+ * <tr><td>AB</td><td>AA=AA, AB=AA</td><td>-93.0</td><td>57.1</td><td>-63.8</td></tr>
+ * <tr><td>AB</td><td>AA=AA, AB=AB</td><td>7.6</td><td>56.1</td><td>-0.3</td></tr>
+ * </table>
+ * <p>
+ * This table shows four time series with 3 measurements at three different timestamps.
+ * Keys, tags, timestamps and values (numeric only at this time) are generated by
+ * this workload. For details on properties and behavior, see  the 
+ * {@code workloads/tsworkload_template} file. The Javadocs will focus on implementation
+ * and how {@link DB} clients can parse the workload.
+ * <p>
+ * In order to avoid having existing DB implementations implement a brand new interface
+ * this workload uses the existing APIs to encode a few special values that can be parsed
+ * by the client. The special values include the timestamp, numeric value and some
+ * query (read or scan) parameters. As an example on how to parse the fields, see
+ * {@link BasicTSDB}.
+ * <p>
+ * <b>Timestamps</b>
+ * <p>
+ * Timestamps are presented as Unix Epoch values in units of {@link TimeUnit#SECONDS},
+ * {@link TimeUnit#MILLISECONDS} or {@link TimeUnit#NANOSECONDS} based on the 
+ * {@code timestampunits} property. For calls to {@link DB#insert(String, String, java.util.Map)}
+ * and {@link DB#update(String, String, java.util.Map)}, the timestamp is added to the 
+ * {@code values} map encoded in a {@link NumericByteIterator} with the key defined
+ * in the {@code timestampkey} property (defaulting to "YCSBTS"). To pull out the timestamp
+ * when iterating over the values map, cast the {@link ByteIterator} to a 
+ * {@link NumericByteIterator} and call {@link NumericByteIterator#getLong()}.
+ * <p>
+ * Note that for calls to {@link DB#update(String, String, java.util.Map)}, timestamps
+ * earlier than the timestamp generator's timestamp will be choosen at random to
+ * mimic a lambda architecture or old job re-reporting some data.
+ * <p>
+ * For calls to {@link DB#read(String, String, java.util.Set, java.util.Map)} and 
+ * {@link DB#scan(String, String, int, java.util.Set, Vector)}, timestamps
+ * are encoded in a {@link StringByteIterator} in a key/value format with the 
+ * {@code tagpairdelimiter} separator. E.g {@code YCSBTS=1483228800}. If {@code querytimespan}
+ * has been set to a positive value then the value will include a range with the
+ * starting (oldest) timestamp followed by the {@code querytimespandelimiter} separator
+ * and the ending (most recent) timestamp. E.g. {@code YCSBTS=1483228800-1483228920}.
+ * <p>
+ * For calls to {@link DB#delete(String, String)}, encoding is the same as reads and 
+ * scans but key/value pairs are separated by the {@code deletedelimiter} property value.
+ * <p>
+ * By default, the starting timestamp is the current system time without any rounding.
+ * All timestamps are then offsets from that starting value.
+ * <p>
+ * <b>Values</b>
+ * <p>
+ * Similar to timestamps, values are encoded in {@link NumericByteIterator}s and stored
+ * in the values map with the key defined in {@code valuekey} (defaulting to "YCSBV").
+ * Values can either be 64 bit signed {@link long}s or double precision {@link double}s 
+ * depending on the {@code valuetype} or {@code dataintegrity} properties. When parsing
+ * out the value, always call {@link NumericByteIterator#isFloatingPoint()} to determine
+ * whether or not to call {@link NumericByteIterator#getDouble()} (true) or 
+ * {@link NumericByteIterator#getLong()} (false). 
+ * <p>
+ * When {@code dataintegrity} is set to true, then the value is always set to a 
+ * 64 bit signed integer which is the Java hash code of the concatenation of the
+ * key and map of values (sorted on the map keys and skipping the timestamp and value
+ * entries) OR'd with the timestamp of the data point. See 
+ * {@link #validationFunction(String, long, TreeMap)} for the implementation.
+ * <p>
+ * <b>Keys and Tags</b>
+ * <p>
+ * As mentioned, the workload generates strings for the keys and tags. On initialization
+ * three string generators are created using the {@link IncrementingPrintableStringGenerator} 
+ * implementation. Then the generators fill three arrays with values based on the
+ * number of keys, the number of tags and the cardinality of each tag key/value pair.
+ * This implementation gives us time series like the example table where every string
+ * starts at something like "AA" (depending on the length of keys, tag keys and tag values)
+ * and continuing to "ZZ" wherein they rollover back to "AA". 
+ * <p>
+ * Each time series must have a unique set of tag keys, i.e. the key "AA" cannot appear
+ * more than once per time series. If the workload is configured for four tags with a
+ * tag key length of 2, the keys would be "AA", "AB", "AC" and "AD". 
+ * <p>
+ * Each tag key is then associated with a tag value. Tag values may appear more than once
+ * in each time series. E.g. time series will usually start with the tags "AA=AA", 
+ * "AB=AA", "AC=AA" and "AD=AA". The {@code tagcardinality} property determines how many
+ * unique values will be generated per tag key. In the example table above, the 
+ * {@code tagcardinality} property would have been set to {@code 1,2} meaning tag 
+ * key "AA" would always have the tag value "AA" given a cardinality of 1. However 
+ * tag key "AB" would have values "AA" and "AB" due to a cardinality of 2. This 
+ * cardinality map, along with the number of unique time series keys determines how 
+ * many unique time series are generated for the workload. Tag values share a common
+ * array of generated strings to save on memory.
+ * <p>
+ * <b>Operation Order</b>
+ * <p>
+ * The default behavior of the workload (for inserts and updates) is to generate a
+ * value for each time series for a given timestamp before incrementing to the next
+ * timestamp and writing values. This is an ideal workload and some time series 
+ * databases are designed for this behavior. However in the real-world events will
+ * arrive grouped close to the current system time with a number of events being 
+ * delayed, hence their timestamps are further in the past. The {@code delayedseries}
+ * property determines the percentage of time series that are delayed by up to
+ * {@code delayedintervals} intervals. E.g. setting this value to 0.05 means that 
+ * 5% of the time series will be written with timestamps earlier than the timestamp
+ * generator's current time.
+ * </p>
+ * <b>Reads and Scans</b>
+ * <p>
+ * For benchmarking queries, some common tasks implemented by almost every time series
+ * data base are available and are passed in the fields {@link Set}:
+ * <p>
+ * <b>GroupBy</b> - A common operation is to aggregate multiple time series into a
+ * single time series via common parameters. For example, a user may want to see the
+ * total network traffic in a data center so they'll issue a SQL query like:
+ * <code>SELECT value FROM timeseriesdb GROUP BY datacenter ORDER BY SUM(value);</code>
+ * If the {@code groupbyfunction} has been set to a group by function, then the fields
+ * will contain a key/value pair with the key set in {@code groupbykey}. E.g.
+ * {@code YCSBGB=SUM}.
+ * <p>
+ * Additionally with grouping enabled, fields on tag keys where group bys should 
+ * occur will only have the key defined and will not have a value or delimiter. E.g.
+ * if grouping on tag key "AA", the field will contain {@code AA} instead of {@code AA=AB}.
+ * <p>
+ * <b>Downsampling</b> - Another common operation is to reduce the resolution of the
+ * queried time series when fetching a wide time range of data so fewer data points 
+ * are returned. For example, a user may fetch a week of data but if the data is
+ * recorded on a 1 second interval, that would be over 600k data points so they
+ * may ask for a 1 hour downsampling (also called bucketing) wherein every hour, all
+ * of the data points for a "bucket" are aggregated into a single value. 
+ * <p>
+ * To enable downsampling, the {@code downsamplingfunction} property must be set to
+ * a supported function such as "SUM" and the {@code downsamplinginterval} must be
+ * set to a valid time interval with the same units as {@code timestampunits}, e.g.
+ * "3600" which would create 1 hour buckets if the time units were set to seconds.
+ * With downsampling, query fields will include a key/value pair with 
+ * {@code downsamplingkey} as the key (defaulting to "YCSBDS") and the value being
+ * a concatenation of {@code downsamplingfunction} and {@code downsamplinginterval},
+ * for example {@code YCSBDS=SUM60}.
+ * <p>
+ * <b>Timestamps</b> - For every read, a random timestamp is selected from the interval
+ * set. If {@code querytimespan} has been set to a positive value, then the configured
+ * query time interval is added to the selected timestamp so the read passes the DB
+ * a range of times. Note that during the run phase, if no data was previously loaded,
+ * or if there are more {@code recordcount}s set for the run phase, reads may be sent
+ * to the DB with timestamps that are beyond the written data time range (or even the
+ * system clock of the DB).
+ * <p>
+ * <b>Deletes</b>
+ * <p>
+ * Because the delete API only accepts a single key, a full key and tag key/value 
+ * pair map is flattened into a single string for parsing by the database. Common
+ * workloads include deleting a single time series (wherein all tag key and values are
+ * defined), deleting all series containing a tag key and value or deleting all of the
+ * time series sharing a common time series key. 
+ * <p>
+ * Right now the workload supports deletes with a key and for time series tag key/value
+ * pairs or a key with tags and a group by on one or more tags (meaning, delete all of
+ * the series with any value for the given tag key). The parameters are collapsed into
+ * a single string delimited with the character in the {@code deletedelimiter} property.
+ * For example, a delete request may look like: {@code AA:AA=AA:AA=AB} to delete the 
+ * first time series in the table above.
+ * <p>
+ * <b>Threads</b>
+ * <p>
+ * For a multi-threaded execution, the number of time series keys set via the 
+ * {@code fieldcount} property, must be greater than or equal to the number of
+ * threads set via {@code threads}. This is due to each thread choosing a subset
+ * of the total number of time series keys and being responsible for writing values 
+ * for each time series containing those keys at each timestamp. Thus each thread
+ * will have it's own timestamp generator, incrementing each time every time series
+ * it is responsible for has had a value written.
+ * <p>
+ * Each thread may, however, issue reads and scans for any time series in the 
+ * complete set.
+ * <p>
+ * <b>Sparsity</b>
+ * <p>
+ * By default, during loads, every time series will have a data point written at every
+ * time stamp in the interval set. This is common in workloads where a sensor writes
+ * a value at regular intervals. However some time series are only reported under 
+ * certain conditions. 
+ * <p>
+ * For example, a counter may track the number of errors over a 
+ * time period for a web service and only report when the value is greater than 1.
+ * Or a time series may include tags such as a user ID and IP address when a request
+ * arrives at the web service and only report values when that combination is seen.
+ * This means the timeseries will <i>not</i> have a value at every timestamp and in
+ * some cases there may be only a single value!  
+ * <p>
+ * This workload has a {@code sparsity} parameter that can choose how often a 
+ * time series should record a value. The default value of 0.0 means every series 
+ * will get a value at every timestamp. A value of 0.95 will mean that for each 
+ * series, only 5% of the timestamps in the interval will have a value. The distribution
+ * of values is random.
+ * <p>
+ * <b>Notes/Warnings</b>
+ * <p>
+ * <ul>
+ * <li>Because time series keys and tag key/values are generated and stored in memory,
+ * be careful of setting the cardinality too high for the JVM's heap.</li>
+ * <li>When running for data integrity, a number of settings are incompatible and will
+ * throw errors. Check the error messages for details.</li>
+ * <li>Databases that support keys only and can't store tags should order and then 
+ * collapse the tag values using a delimiter. For example the series in the example 
+ * table at the top could be written as:
+ * <ul>
+ * <li>{@code AA.AA.AA}</li>
+ * <li>{@code AA.AA.AB}</li>
+ * <li>{@code AB.AA.AA}</li>
+ * <li>{@code AB.AA.AB}</li>
+ * </ul></li>
+ * </ul>
+ * <p>
+ * <b>TODOs</b>
+ * <p>
+ * <ul>
+ * <li>Support random time intervals. E.g. some series write every second, others every
+ * 60 seconds.</li>
+ * <li>Support random time series cardinality. Right now every series has the same 
+ * cardinality.</li>
+ * <li>Truly random timetamps per time series. We could use bitmaps to determine if
+ * a series has had a value written for a given timestamp. Right now all of the series
+ * are in sync time-wise.</li>
+ * <li>Possibly a real-time load where values are written with the current system time.
+ * It's more of a bulk-loading operation now.</li>
+ * </ul>
+ */
+public class TimeSeriesWorkload extends Workload {  
+  
+  /**
+   * The types of values written to the timeseries store.
+   */
+  public enum ValueType {
+    INTEGERS("integers"),
+    FLOATS("floats"),
+    MIXED("mixednumbers");
+    
+    protected final String name;
+    
+    ValueType(final String name) {
+      this.name = name;
+    }
+    
+    public static ValueType fromString(final String name) {
+      for (final ValueType type : ValueType.values()) {
+        if (type.name.equalsIgnoreCase(name)) {
+          return type;
+        }
+      }
+      throw new IllegalArgumentException("Unrecognized type: " + name);
+    }
+  }
+  
+  /** Name and default value for the timestamp key property. */
+  public static final String TIMESTAMP_KEY_PROPERTY = "timestampkey";
+  public static final String TIMESTAMP_KEY_PROPERTY_DEFAULT = "YCSBTS";
+  
+  /** Name and default value for the value key property. */
+  public static final String VALUE_KEY_PROPERTY = "valuekey";
+  public static final String VALUE_KEY_PROPERTY_DEFAULT = "YCSBV";
+  
+  /** Name and default value for the timestamp interval property. */    
+  public static final String TIMESTAMP_INTERVAL_PROPERTY = "timestampinterval";    
+  public static final String TIMESTAMP_INTERVAL_PROPERTY_DEFAULT = "60";    
+      
+  /** Name and default value for the timestamp units property. */   
+  public static final String TIMESTAMP_UNITS_PROPERTY = "timestampunits";    
+  public static final String TIMESTAMP_UNITS_PROPERTY_DEFAULT = "SECONDS"; 
+  
+  /** Name and default value for the number of tags property. */
+  public static final String TAG_COUNT_PROPERTY = "tagcount";
+  public static final String TAG_COUNT_PROPERTY_DEFAULT = "4";
+  
+  /** Name and default value for the tag value cardinality map property. */
+  public static final String TAG_CARDINALITY_PROPERTY = "tagcardinality";
+  public static final String TAG_CARDINALITY_PROPERTY_DEFAULT = "1, 2, 4, 8";
+  
+  /** Name and default value for the tag key length property. */
+  public static final String TAG_KEY_LENGTH_PROPERTY = "tagkeylength";
+  public static final String TAG_KEY_LENGTH_PROPERTY_DEFAULT = "8";
+  
+  /** Name and default value for the tag value length property. */
+  public static final String TAG_VALUE_LENGTH_PROPERTY = "tagvaluelength";
+  public static final String TAG_VALUE_LENGTH_PROPERTY_DEFAULT = "8";
+  
+  /** Name and default value for the tag pair delimiter property. */
+  public static final String PAIR_DELIMITER_PROPERTY = "tagpairdelimiter";
+  public static final String PAIR_DELIMITER_PROPERTY_DEFAULT = "=";
+  
+  /** Name and default value for the delete string delimiter property. */
+  public static final String DELETE_DELIMITER_PROPERTY = "deletedelimiter";
+  public static final String DELETE_DELIMITER_PROPERTY_DEFAULT = ":";
+  
+  /** Name and default value for the random timestamp write order property. */
+  public static final String RANDOMIZE_TIMESTAMP_ORDER_PROPERTY = "randomwritetimestamporder";
+  public static final String RANDOMIZE_TIMESTAMP_ORDER_PROPERTY_DEFAULT = "false";
+  
+  /** Name and default value for the random time series write order property. */
+  public static final String RANDOMIZE_TIMESERIES_ORDER_PROPERTY = "randomtimeseriesorder";
+  public static final String RANDOMIZE_TIMESERIES_ORDER_PROPERTY_DEFAULT = "true";
+  
+  /** Name and default value for the value types property. */
+  public static final String VALUE_TYPE_PROPERTY = "valuetype";
+  public static final String VALUE_TYPE_PROPERTY_DEFAULT = "floats";
+  
+  /** Name and default value for the sparsity property. */
+  public static final String SPARSITY_PROPERTY = "sparsity";
+  public static final String SPARSITY_PROPERTY_DEFAULT = "0.00";
+  
+  /** Name and default value for the delayed series percentage property. */
+  public static final String DELAYED_SERIES_PROPERTY = "delayedseries";
+  public static final String DELAYED_SERIES_PROPERTY_DEFAULT = "0.10";
+  
+  /** Name and default value for the delayed series intervals property. */
+  public static final String DELAYED_INTERVALS_PROPERTY = "delayedintervals";
+  public static final String DELAYED_INTERVALS_PROPERTY_DEFAULT = "5";
+  
+  /** Name and default value for the query time span property. */
+  public static final String QUERY_TIMESPAN_PROPERTY = "querytimespan";
+  public static final String QUERY_TIMESPAN_PROPERTY_DEFAULT = "0";
+  
+  /** Name and default value for the randomized query time span property. */
+  public static final String QUERY_RANDOM_TIMESPAN_PROPERTY = "queryrandomtimespan";
+  public static final String QUERY_RANDOM_TIMESPAN_PROPERTY_DEFAULT = "false";
+  
+  /** Name and default value for the query time stamp delimiter property. */
+  public static final String QUERY_TIMESPAN_DELIMITER_PROPERTY = "querytimespandelimiter";
+  public static final String QUERY_TIMESPAN_DELIMITER_PROPERTY_DEFAULT = ",";
+  
+  /** Name and default value for the group-by key property. */
+  public static final String GROUPBY_KEY_PROPERTY = "groupbykey";
+  public static final String GROUPBY_KEY_PROPERTY_DEFAULT = "YCSBGB";
+  
+  /** Name and default value for the group-by function property. */
+  public static final String GROUPBY_PROPERTY = "groupbyfunction";
+  
+  /** Name and default value for the group-by key map property. */
+  public static final String GROUPBY_KEYS_PROPERTY = "groupbykeys";
+  
+  /** Name and default value for the downsampling key property. */
+  public static final String DOWNSAMPLING_KEY_PROPERTY = "downsamplingkey";
+  public static final String DOWNSAMPLING_KEY_PROPERTY_DEFAULT = "YCSBDS";
+  
+  /** Name and default value for the downsampling function property. */
+  public static final String DOWNSAMPLING_FUNCTION_PROPERTY = "downsamplingfunction";
+  
+  /** Name and default value for the downsampling interval property. */
+  public static final String DOWNSAMPLING_INTERVAL_PROPERTY = "downsamplinginterval";
+  
+  /** The properties to pull settings from. */
+  protected Properties properties;
+  
+  /** Generators for keys, tag keys and tag values. */
+  protected Generator<String> keyGenerator;
+  protected Generator<String> tagKeyGenerator;
+  protected Generator<String> tagValueGenerator;
+  
+  /** The timestamp key, defaults to "YCSBTS". */
+  protected String timestampKey;
+  
+  /** The value key, defaults to "YCSBDS". */
+  protected String valueKey;
+  
+  /** The number of time units in between timestamps. */
+  protected int timestampInterval;
+  
+  /** The units of time the timestamp and various intervals represent. */
+  protected TimeUnit timeUnits;
+  
+  /** Whether or not to randomize the timestamp order when writing. */
+  protected boolean randomizeTimestampOrder;
+  
+  /** Whether or not to randomize (shuffle) the time series order. NOT compatible
+   * with data integrity. */
+  protected boolean randomizeTimeseriesOrder;
+  
+  /** The type of values to generate when writing data. */
+  protected ValueType valueType;
+  
+  /** Used to calculate an offset for each time series. */
+  protected int[] cumulativeCardinality;
+  
+  /** The calculated total cardinality based on the config. */
+  protected int totalCardinality;
+  
+  /** The calculated per-time-series-key cardinality. I.e. the number of unique
+   * tag key and value combinations. */
+  protected int perKeyCardinality;
+  
+  /** How much data to scan for in each call. */
+  protected NumberGenerator scanlength;
+  
+  /** A generator used to select a random time series key per read/scan. */
+  protected NumberGenerator keychooser;
+  
+  /** A generator to select what operation to perform during the run phase. */
+  protected DiscreteGenerator operationchooser;
+  
+  /** The maximum number of interval offsets from the starting timestamp. Calculated
+   * based on the number of records configured for the run. */
+  protected int maxOffsets;
+  
+  /** The number of records or operations to perform for this run. */
+  protected int recordcount;
+  
+  /** The number of tag pairs per time series. */
+  protected int tagPairs;
+  
+  /** The table we'll write to. */
+  protected String table;
+  
+  /** How many time series keys will be generated. */
+  protected int numKeys;
+  
+  /** The generated list of possible time series key values. */
+  protected String[] keys;
+
+  /** The generated list of possible tag key values. */
+  protected String[] tagKeys;
+  
+  /** The generated list of possible tag value values. */
+  protected String[] tagValues;
+  
+  /** The cardinality for each tag key. */
+  protected int[] tagCardinality;
+  
+  /** A helper to skip non-incrementing tag values. */
+  protected int firstIncrementableCardinality;
+  
+  /** How sparse the data written should be. */
+  protected double sparsity;
+  
+  /** The percentage of time series that should be delayed in writes. */
+  protected double delayedSeries;
+  
+  /** The maximum number of intervals to delay a series. */
+  protected int delayedIntervals;
+  
+  /** Optional query time interval during reads/scans. */
+  protected int queryTimeSpan;
+  
+  /** Whether or not the actual interval should be randomly chosen, using 
+   * queryTimeSpan as the maximum value. */
+  protected boolean queryRandomTimeSpan;
+  
+  /** The delimiter for tag pairs in fields. */
+  protected String tagPairDelimiter;
+  
+  /** The delimiter between parameters for the delete key. */
+  protected String deleteDelimiter;
+  
+  /** The delimiter between timestamps for query time spans. */
+  protected String queryTimeSpanDelimiter;
+  
+  /** Whether or not to issue group-by queries. */
+  protected boolean groupBy;
+  
+  /** The key used for group-by tag keys. */
+  protected String groupByKey;
+  
+  /** The function used for group-by's. */
+  protected String groupByFunction;
+  
+  /** The tag keys to group on. */
+  protected boolean[] groupBys;
+  
+  /** Whether or not to issue downsampling queries. */
+  protected boolean downsample;
+  
+  /** The key used for downsampling tag keys. */
+  protected String downsampleKey;
+  
+  /** The downsampling function. */
+  protected String downsampleFunction;
+  
+  /** The downsampling interval. */
+  protected int downsampleInterval;
+
+  /**
+   * Set to true if want to check correctness of reads. Must also
+   * be set to true during loading phase to function.
+   */
+  protected boolean dataintegrity;
+  
+  /** Measurements to write data integrity results to. */
+  protected Measurements measurements = Measurements.getMeasurements();
+  
+  @Override
+  public void init(final Properties p) throws WorkloadException {
+    properties = p;
+    recordcount =
+        Integer.parseInt(p.getProperty(Client.RECORD_COUNT_PROPERTY, 
+            Client.DEFAULT_RECORD_COUNT));
+    if (recordcount == 0) {
+      recordcount = Integer.MAX_VALUE;
+    }
+    timestampKey = p.getProperty(TIMESTAMP_KEY_PROPERTY, TIMESTAMP_KEY_PROPERTY_DEFAULT);
+    valueKey = p.getProperty(VALUE_KEY_PROPERTY, VALUE_KEY_PROPERTY_DEFAULT);
+    operationchooser = CoreWorkload.createOperationGenerator(properties);
+    
+    final int maxscanlength =
+        Integer.parseInt(p.getProperty(CoreWorkload.MAX_SCAN_LENGTH_PROPERTY, 
+            CoreWorkload.MAX_SCAN_LENGTH_PROPERTY_DEFAULT));
+    String scanlengthdistrib =
+        p.getProperty(CoreWorkload.SCAN_LENGTH_DISTRIBUTION_PROPERTY, 
+            CoreWorkload.SCAN_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT);
+    
+    if (scanlengthdistrib.compareTo("uniform") == 0) {
+      scanlength = new UniformLongGenerator(1, maxscanlength);
+    } else if (scanlengthdistrib.compareTo("zipfian") == 0) {
+      scanlength = new ZipfianGenerator(1, maxscanlength);
+    } else {
+      throw new WorkloadException(
+          "Distribution \"" + scanlengthdistrib + "\" not allowed for scan length");
+    }
+    
+    randomizeTimestampOrder = Boolean.parseBoolean(p.getProperty(
+        RANDOMIZE_TIMESTAMP_ORDER_PROPERTY, 
+        RANDOMIZE_TIMESTAMP_ORDER_PROPERTY_DEFAULT));
+    randomizeTimeseriesOrder = Boolean.parseBoolean(p.getProperty(
+        RANDOMIZE_TIMESERIES_ORDER_PROPERTY, 
+        RANDOMIZE_TIMESERIES_ORDER_PROPERTY_DEFAULT));
+    
+    // setup the cardinality
+    numKeys = Integer.parseInt(p.getProperty(CoreWorkload.FIELD_COUNT_PROPERTY, 
+        CoreWorkload.FIELD_COUNT_PROPERTY_DEFAULT));
+    tagPairs = Integer.parseInt(p.getProperty(TAG_COUNT_PROPERTY, 
+        TAG_COUNT_PROPERTY_DEFAULT));
+    sparsity = Double.parseDouble(p.getProperty(SPARSITY_PROPERTY, SPARSITY_PROPERTY_DEFAULT));
+    tagCardinality = new int[tagPairs];
+    
+    final String requestdistrib =
+        p.getProperty(CoreWorkload.REQUEST_DISTRIBUTION_PROPERTY, 
+            CoreWorkload.REQUEST_DISTRIBUTION_PROPERTY_DEFAULT);
+    if (requestdistrib.compareTo("uniform") == 0) {
+      keychooser = new UniformLongGenerator(0, numKeys - 1);
+    } else if (requestdistrib.compareTo("sequential") == 0) {
+      keychooser = new SequentialGenerator(0, numKeys - 1);
+    } else if (requestdistrib.compareTo("zipfian") == 0) {
+      keychooser = new ScrambledZipfianGenerator(0, numKeys - 1);
+    //} else if (requestdistrib.compareTo("latest") == 0) {
+    //  keychooser = new SkewedLatestGenerator(transactioninsertkeysequence);
+    } else if (requestdistrib.equals("hotspot")) {
+      double hotsetfraction =
+          Double.parseDouble(p.getProperty(CoreWorkload.HOTSPOT_DATA_FRACTION, 
+              CoreWorkload.HOTSPOT_DATA_FRACTION_DEFAULT));
+      double hotopnfraction =
+          Double.parseDouble(p.getProperty(CoreWorkload.HOTSPOT_OPN_FRACTION, 
+              CoreWorkload.HOTSPOT_OPN_FRACTION_DEFAULT));
+      keychooser = new HotspotIntegerGenerator(0, numKeys - 1,
+          hotsetfraction, hotopnfraction);
+    } else {
+      throw new WorkloadException("Unknown request distribution \"" + requestdistrib + "\"");
+    }
+    
+    // figure out the start timestamp based on the units, cardinality and interval
+    try {
+      timestampInterval = Integer.parseInt(p.getProperty(
+          TIMESTAMP_INTERVAL_PROPERTY, TIMESTAMP_INTERVAL_PROPERTY_DEFAULT));
+    } catch (NumberFormatException nfe) {
+      throw new WorkloadException("Unable to parse the " + 
+          TIMESTAMP_INTERVAL_PROPERTY, nfe);
+    }
+    
+    try {
+      timeUnits = TimeUnit.valueOf(p.getProperty(TIMESTAMP_UNITS_PROPERTY, 
+          TIMESTAMP_UNITS_PROPERTY_DEFAULT).toUpperCase());
+    } catch (IllegalArgumentException e) {
+      throw new WorkloadException("Unknown time unit type", e);
+    }
+    if (timeUnits == TimeUnit.NANOSECONDS || timeUnits == TimeUnit.MICROSECONDS) {
+      throw new WorkloadException("YCSB doesn't support " + timeUnits + 
+          " at this time.");
+    }
+    
+    tagPairDelimiter = p.getProperty(PAIR_DELIMITER_PROPERTY, PAIR_DELIMITER_PROPERTY_DEFAULT);
+    deleteDelimiter = p.getProperty(DELETE_DELIMITER_PROPERTY, DELETE_DELIMITER_PROPERTY_DEFAULT);
+    dataintegrity = Boolean.parseBoolean(
+        p.getProperty(CoreWorkload.DATA_INTEGRITY_PROPERTY, 
+            CoreWorkload.DATA_INTEGRITY_PROPERTY_DEFAULT));
+    
+    queryTimeSpan = Integer.parseInt(p.getProperty(QUERY_TIMESPAN_PROPERTY, 
+        QUERY_TIMESPAN_PROPERTY_DEFAULT));
+    queryRandomTimeSpan = Boolean.parseBoolean(p.getProperty(QUERY_RANDOM_TIMESPAN_PROPERTY, 
+        QUERY_RANDOM_TIMESPAN_PROPERTY_DEFAULT));
+    queryTimeSpanDelimiter = p.getProperty(QUERY_TIMESPAN_DELIMITER_PROPERTY, 
+        QUERY_TIMESPAN_DELIMITER_PROPERTY_DEFAULT);
+    
+    groupByKey = p.getProperty(GROUPBY_KEY_PROPERTY, GROUPBY_KEY_PROPERTY_DEFAULT);
+    groupByFunction = p.getProperty(GROUPBY_PROPERTY);
+    if (groupByFunction != null && !groupByFunction.isEmpty()) {
+      final String groupByKeys = p.getProperty(GROUPBY_KEYS_PROPERTY);
+      if (groupByKeys == null || groupByKeys.isEmpty()) {
+        throw new WorkloadException("Group by was enabled but no keys were specified.");
+      }
+      final String[] gbKeys = groupByKeys.split(",");
+      if (gbKeys.length != tagKeys.length) {
+        throw new WorkloadException("Only " + gbKeys.length + " group by keys "
+            + "were specified but there were " + tagKeys.length + " tag keys given.");
+      }
+      groupBys = new boolean[gbKeys.length];
+      for (int i = 0; i < gbKeys.length; i++) {
+        groupBys[i] = Integer.parseInt(gbKeys[i].trim()) == 0 ? false : true;
+      }
+      groupBy = true;
+    }
+    
+    downsampleKey = p.getProperty(DOWNSAMPLING_KEY_PROPERTY, DOWNSAMPLING_KEY_PROPERTY_DEFAULT);
+    downsampleFunction = p.getProperty(DOWNSAMPLING_FUNCTION_PROPERTY);
+    if (downsampleFunction != null && !downsampleFunction.isEmpty()) {
+      final String interval = p.getProperty(DOWNSAMPLING_INTERVAL_PROPERTY);
+      if (interval == null || interval.isEmpty()) {
+        throw new WorkloadException("'" + DOWNSAMPLING_INTERVAL_PROPERTY + "' was missing despite '" 
+            + DOWNSAMPLING_FUNCTION_PROPERTY + "' being set.");
+      }
+      downsampleInterval = Integer.parseInt(interval);
+      downsample = true;
+    }
+    
+    delayedSeries = Double.parseDouble(p.getProperty(DELAYED_SERIES_PROPERTY, DELAYED_SERIES_PROPERTY_DEFAULT));
+    delayedIntervals = Integer.parseInt(p.getProperty(DELAYED_INTERVALS_PROPERTY, DELAYED_INTERVALS_PROPERTY_DEFAULT));
+    
+    valueType = ValueType.fromString(p.getProperty(VALUE_TYPE_PROPERTY, VALUE_TYPE_PROPERTY_DEFAULT));
+    table = p.getProperty(CoreWorkload.TABLENAME_PROPERTY, CoreWorkload.TABLENAME_PROPERTY_DEFAULT);
+    initKeysAndTags();
+    validateSettings();
+  }
+  
+  @Override
+  public Object initThread(Properties p, int mythreadid, int threadcount) throws WorkloadException {
+    if (properties == null) {
+      throw new WorkloadException("Workload has not been initialized.");
+    }
+    return new ThreadState(mythreadid, threadcount);
+  }
+  
+  @Override
+  public boolean doInsert(DB db, Object threadstate) {
+    if (threadstate == null) {
+      throw new IllegalStateException("Missing thread state.");
+    }
+    final Map<String, ByteIterator> tags = new TreeMap<String, ByteIterator>();
+    final String key = ((ThreadState)threadstate).nextDataPoint(tags, true);
+    if (db.insert(table, key, tags) == Status.OK) {
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public boolean doTransaction(DB db, Object threadstate) {
+    if (threadstate == null) {
+      throw new IllegalStateException("Missing thread state.");
+    }
+    switch (operationchooser.nextString()) {
+    case "READ":
+      doTransactionRead(db, threadstate);
+      break;
+    case "UPDATE":
+      doTransactionUpdate(db, threadstate);
+      break;
+    case "INSERT": 
+      doTransactionInsert(db, threadstate);
+      break;
+    case "SCAN":
+      doTransactionScan(db, threadstate);
+      break;
+    case "DELETE":
+      doTransactionDelete(db, threadstate);
+      break;
+    default:
+      return false;
+    }
+    return true;
+  }
+
+  protected void doTransactionRead(final DB db, Object threadstate) {
+    final ThreadState state = (ThreadState) threadstate;
+    final String keyname = keys[keychooser.nextValue().intValue()];
+    
+    int offsets = state.queryOffsetGenerator.nextValue().intValue();
+    //int offsets = Utils.random().nextInt(maxOffsets - 1);
+    final long startTimestamp;
+    if (offsets > 0) {
+      startTimestamp = state.startTimestamp + state.timestampGenerator.getOffset(offsets);
+    } else {
+      startTimestamp = state.startTimestamp;
+    }
+    
+    // rando tags
+    Set<String> fields = new HashSet<String>();
+    for (int i = 0; i < tagPairs; ++i) {
+      if (groupBy && groupBys[i]) {
+        fields.add(tagKeys[i]);
+      } else {
+        fields.add(tagKeys[i] + tagPairDelimiter + 
+            tagValues[Utils.random().nextInt(tagCardinality[i])]);
+      }
+    }
+    
+    if (queryTimeSpan > 0) {
+      final long endTimestamp;
+      if (queryRandomTimeSpan) {
+        endTimestamp = startTimestamp + (timestampInterval * Utils.random().nextInt(queryTimeSpan / timestampInterval));
+      } else {
+        endTimestamp = startTimestamp + queryTimeSpan;
+      }
+      fields.add(timestampKey + tagPairDelimiter + startTimestamp + queryTimeSpanDelimiter + endTimestamp);
+    } else {
+      fields.add(timestampKey + tagPairDelimiter + startTimestamp);  
+    }
+    if (groupBy) {
+      fields.add(groupByKey + tagPairDelimiter + groupByFunction);
+    }
+    if (downsample) {
+      fields.add(downsampleKey + tagPairDelimiter + downsampleFunction + downsampleInterval);
+    }
+    
+    final Map<String, ByteIterator> cells = new HashMap<String, ByteIterator>();
+    final Status status = db.read(table, keyname, fields, cells);
+    
+    if (dataintegrity && status == Status.OK) {
+      verifyRow(keyname, cells);
+    }
+  }
+  
+  protected void doTransactionUpdate(final DB db, Object threadstate) {
+    if (threadstate == null) {
+      throw new IllegalStateException("Missing thread state.");
+    }
+    final Map<String, ByteIterator> tags = new TreeMap<String, ByteIterator>();
+    final String key = ((ThreadState)threadstate).nextDataPoint(tags, false);
+    db.update(table, key, tags);
+  }
+  
+  protected void doTransactionInsert(final DB db, Object threadstate) {
+    doInsert(db, threadstate);
+  }
+  
+  protected void doTransactionScan(final DB db, Object threadstate) {
+    final ThreadState state = (ThreadState) threadstate;
+    
+    final String keyname = keys[Utils.random().nextInt(keys.length)];
+    
+    // choose a random scan length
+    int len = scanlength.nextValue().intValue();
+    
+    int offsets = Utils.random().nextInt(maxOffsets - 1);
+    final long startTimestamp;
+    if (offsets > 0) {
+      startTimestamp = state.startTimestamp + state.timestampGenerator.getOffset(offsets);
+    } else {
+      startTimestamp = state.startTimestamp;
+    }
+    
+    // rando tags
+    Set<String> fields = new HashSet<String>();
+    for (int i = 0; i < tagPairs; ++i) {
+      if (groupBy && groupBys[i]) {
+        fields.add(tagKeys[i]);
+      } else {
+        fields.add(tagKeys[i] + tagPairDelimiter + 
+            tagValues[Utils.random().nextInt(tagCardinality[i])]);
+      }
+    }
+    
+    if (queryTimeSpan > 0) {
+      final long endTimestamp;
+      if (queryRandomTimeSpan) {
+        endTimestamp = startTimestamp + (timestampInterval * Utils.random().nextInt(queryTimeSpan / timestampInterval));
+      } else {
+        endTimestamp = startTimestamp + queryTimeSpan;
+      }
+      fields.add(timestampKey + tagPairDelimiter + startTimestamp + queryTimeSpanDelimiter + endTimestamp);
+    } else {
+      fields.add(timestampKey + tagPairDelimiter + startTimestamp);  
+    }
+    if (groupBy) {
+      fields.add(groupByKey + tagPairDelimiter + groupByFunction);
+    }
+    if (downsample) {
+      fields.add(downsampleKey + tagPairDelimiter + downsampleFunction + tagPairDelimiter + downsampleInterval);
+    }
+    
+    final Vector<HashMap<String, ByteIterator>> results = new Vector<HashMap<String, ByteIterator>>();
+    db.scan(table, keyname, len, fields, results);
+  }
+  
+  protected void doTransactionDelete(final DB db, Object threadstate) {
+    final ThreadState state = (ThreadState) threadstate;
+    
+    final StringBuilder buf = new StringBuilder().append(keys[Utils.random().nextInt(keys.length)]);
+    
+    int offsets = Utils.random().nextInt(maxOffsets - 1);
+    final long startTimestamp;
+    if (offsets > 0) {
+      startTimestamp = state.startTimestamp + state.timestampGenerator.getOffset(offsets);
+    } else {
+      startTimestamp = state.startTimestamp;
+    }
+    
+    // rando tags
+    for (int i = 0; i < tagPairs; ++i) {
+      if (groupBy && groupBys[i]) {
+        buf.append(deleteDelimiter)
+           .append(tagKeys[i]);
+      } else {
+        buf.append(deleteDelimiter).append(tagKeys[i] + tagPairDelimiter + 
+            tagValues[Utils.random().nextInt(tagCardinality[i])]);
+      }
+    }
+    
+    if (queryTimeSpan > 0) {
+      final long endTimestamp;
+      if (queryRandomTimeSpan) {
+        endTimestamp = startTimestamp + (timestampInterval * Utils.random().nextInt(queryTimeSpan / timestampInterval));
+      } else {
+        endTimestamp = startTimestamp + queryTimeSpan;
+      }
+      buf.append(deleteDelimiter)
+         .append(timestampKey + tagPairDelimiter + startTimestamp + queryTimeSpanDelimiter + endTimestamp);
+    } else {
+      buf.append(deleteDelimiter)
+         .append(timestampKey + tagPairDelimiter + startTimestamp);  
+    }
+    
+    db.delete(table, buf.toString());
+  }
+  
+  /**
+   * Parses the values returned by a read or scan operation and determines whether
+   * or not the integer value matches the hash and timestamp of the original timestamp.
+   * Only works for raw data points, will not work for group-by's or downsampled data.
+   * @param key The time series key.
+   * @param cells The cells read by the DB.
+   * @return {@link Status#OK} if the data matched or {@link Status#UNEXPECTED_STATE} if
+   * the data did not match.
+   */
+  protected Status verifyRow(final String key, final Map<String, ByteIterator> cells) {
+    Status verifyStatus = Status.UNEXPECTED_STATE;
+    long startTime = System.nanoTime();
+
+    double value = 0;
+    long timestamp = 0;
+    final TreeMap<String, String> validationTags = new TreeMap<String, String>();
+    for (final Entry<String, ByteIterator> entry : cells.entrySet()) {
+      if (entry.getKey().equals(timestampKey)) {
+        final NumericByteIterator it = (NumericByteIterator) entry.getValue();
+        timestamp = it.getLong();
+      } else if (entry.getKey().equals(valueKey)) {
+        final NumericByteIterator it = (NumericByteIterator) entry.getValue();
+        value = it.isFloatingPoint() ? it.getDouble() : it.getLong();
+      } else {
+        validationTags.put(entry.getKey(), entry.getValue().toString());
+      }
+    }
+
+    if (validationFunction(key, timestamp, validationTags) == value) {
+      verifyStatus = Status.OK;
+    }
+    long endTime = System.nanoTime();
+    measurements.measure("VERIFY", (int) (endTime - startTime) / 1000);
+    measurements.reportStatus("VERIFY", verifyStatus);
+    return verifyStatus;
+  }
+  
+  /**
+   * Function used for generating a deterministic hash based on the combination
+   * of metric, tags and timestamp.
+   * @param key A non-null string representing the key.
+   * @param timestamp A timestamp in the proper units for the workload.
+   * @param tags A non-null map of tag keys and values NOT including the YCSB
+   * key or timestamp.
+   * @return A hash value as an 8 byte integer.
+   */
+  protected long validationFunction(final String key, final long timestamp, 
+                                    final TreeMap<String, String> tags) {
+    final StringBuilder validationBuffer = new StringBuilder(keys[0].length() + 
+        (tagPairs * tagKeys[0].length()) + (tagPairs * tagCardinality[1]));
+    for (final Entry<String, String> pair : tags.entrySet()) {
+      validationBuffer.append(pair.getKey()).append(pair.getValue());
+    }
+    return (long) validationBuffer.toString().hashCode() ^ timestamp;
+  }
+  
+  /**
+   * Breaks out the keys, tags and cardinality initialization in another method
+   * to keep CheckStyle happy.
+   * @throws WorkloadException If something goes pear shaped.
+   */
+  protected void initKeysAndTags() throws WorkloadException {
+    final int keyLength = Integer.parseInt(properties.getProperty(
+        CoreWorkload.FIELD_LENGTH_PROPERTY, 
+        CoreWorkload.FIELD_LENGTH_PROPERTY_DEFAULT));
+    final int tagKeyLength = Integer.parseInt(properties.getProperty(
+        TAG_KEY_LENGTH_PROPERTY, TAG_KEY_LENGTH_PROPERTY_DEFAULT));
+    final int tagValueLength = Integer.parseInt(properties.getProperty(
+        TAG_VALUE_LENGTH_PROPERTY, TAG_VALUE_LENGTH_PROPERTY_DEFAULT));
+    
+    keyGenerator = new IncrementingPrintableStringGenerator(keyLength);
+    tagKeyGenerator = new IncrementingPrintableStringGenerator(tagKeyLength);
+    tagValueGenerator = new IncrementingPrintableStringGenerator(tagValueLength);
+    
+    final int threads = Integer.parseInt(properties.getProperty(Client.THREAD_COUNT_PROPERTY, "1"));
+    final String tagCardinalityString = properties.getProperty(
+        TAG_CARDINALITY_PROPERTY, 
+        TAG_CARDINALITY_PROPERTY_DEFAULT);
+    final String[] tagCardinalityParts = tagCardinalityString.split(",");
+    int idx = 0;
+    totalCardinality = numKeys;
+    perKeyCardinality = 1;
+    int maxCardinality = 0;
+    for (final String card : tagCardinalityParts) {
+      try {
+        tagCardinality[idx] = Integer.parseInt(card.trim());
+      } catch (NumberFormatException nfe) {
+        throw new WorkloadException("Unable to parse cardinality: " + 
+            card, nfe);
+      }
+      if (tagCardinality[idx] < 1) {
+        throw new WorkloadException("Cardinality must be greater than zero: " + 
+            tagCardinality[idx]);
+      }
+      totalCardinality *= tagCardinality[idx];
+      perKeyCardinality *= tagCardinality[idx];
+      if (tagCardinality[idx] > maxCardinality) {
+        maxCardinality = tagCardinality[idx];
+      }
+      ++idx;
+      if (idx >= tagPairs) {
+        // we have more cardinalities than tag keys so bail at this point.
+        break;
+      }
+    }
+    if (numKeys < threads) {
+      throw new WorkloadException("Field count " + numKeys + " (keys for time "
+          + "series workloads) must be greater or equal to the number of "
+          + "threads " + threads);
+    }
+    
+    // fill tags without explicit cardinality with 1
+    if (idx < tagPairs) {
+      tagCardinality[idx++] = 1;
+    }
+    
+    for (int i = 0; i < tagCardinality.length; ++i) {
+      if (tagCardinality[i] > 1) {
+        firstIncrementableCardinality = i;
+        break;
+      }
+    }
+    
+    keys = new String[numKeys];
+    tagKeys = new String[tagPairs];
+    tagValues = new String[maxCardinality];
+    for (int i = 0; i < numKeys; ++i) {
+      keys[i] = keyGenerator.nextString();
+    }
+
+    for (int i = 0; i < tagPairs; ++i) {
+      tagKeys[i] = tagKeyGenerator.nextString();
+    }
+    
+    for (int i = 0; i < maxCardinality; i++) {
+      tagValues[i] = tagValueGenerator.nextString();
+    }
+    if (randomizeTimeseriesOrder) {
+      Utils.shuffleArray(keys);
+      Utils.shuffleArray(tagValues);
+    }
+    
+    maxOffsets = (recordcount / totalCardinality) + 1;
+    final int[] keyAndTagCardinality = new int[tagPairs + 1];
+    keyAndTagCardinality[0] = numKeys;
+    for (int i = 0; i < tagPairs; i++) {
+      keyAndTagCardinality[i + 1] = tagCardinality[i];
+    }
+    
+    cumulativeCardinality = new int[keyAndTagCardinality.length];
+    for (int i = 0; i < keyAndTagCardinality.length; i++) {
+      int cumulation = 1;
+      for (int x = i; x <= keyAndTagCardinality.length - 1; x++) {
+        cumulation *= keyAndTagCardinality[x];
+      }
+      if (i > 0) {
+        cumulativeCardinality[i - 1] = cumulation;
+      }
+    }
+    cumulativeCardinality[cumulativeCardinality.length - 1] = 1;
+  }
+  
+  /**
+   * Makes sure the settings as given are compatible.
+   * @throws WorkloadException If one or more settings were invalid.
+   */
+  protected void validateSettings() throws WorkloadException {
+    if (dataintegrity) {
+      if (valueType != ValueType.INTEGERS) {
+        throw new WorkloadException("Data integrity was enabled. 'valuetype' must "
+            + "be set to 'integers'.");
+      }
+      if (groupBy) {
+        throw new WorkloadException("Data integrity was enabled. 'groupbyfunction' must "
+            + "be empty or null.");
+      }
+      if (downsample) {
+        throw new WorkloadException("Data integrity was enabled. 'downsamplingfunction' must "
+            + "be empty or null.");
+      }
+      if (queryTimeSpan > 0) {
+        throw new WorkloadException("Data integrity was enabled. 'querytimespan' must "
+            + "be empty or 0.");
+      }
+      if (randomizeTimeseriesOrder) {
+        throw new WorkloadException("Data integrity was enabled. 'randomizetimeseriesorder' must "
+            + "be false.");
+      }
+      final String startTimestamp = properties.getProperty(CoreWorkload.INSERT_START_PROPERTY);
+      if (startTimestamp == null || startTimestamp.isEmpty()) {
+        throw new WorkloadException("Data integrity was enabled. 'insertstart' must "
+            + "be set to a Unix Epoch timestamp.");
+      }
+    }
+  }
+  
+  /**
+   * Thread state class holding thread local generators and indices.
+   */
+  protected class ThreadState {
+    /** The timestamp generator for this thread. */
+    protected final UnixEpochTimestampGenerator timestampGenerator;
+    
+    /** An offset generator to select a random offset for queries. */
+    protected final NumberGenerator queryOffsetGenerator;
+    
+    /** The current write key index. */
+    protected int keyIdx;
+    
+    /** The starting fence for writing keys. */
+    protected int keyIdxStart;
+    
+    /** The ending fence for writing keys. */
+    protected int keyIdxEnd;
+    
+    /** Indices for each tag value for writes. */
+    protected int[] tagValueIdxs;
+
+    /** Whether or not all time series have written values for the current timestamp. */
+    protected boolean rollover;
+    
+    /** The starting timestamp. */
+    protected long startTimestamp;
+    
+    /**
+     * Default ctor.
+     * @param threadID The zero based thread ID.
+     * @param threadCount The total number of threads.
+     * @throws WorkloadException If something went pear shaped.
+     */
+    protected ThreadState(final int threadID, final int threadCount) throws WorkloadException {
+      int totalThreads = threadCount > 0 ? threadCount : 1;
+      
+      if (threadID >= totalThreads) {
+        throw new IllegalStateException("Thread ID " + threadID + " cannot be greater "
+            + "than or equal than the thread count " + totalThreads);
+      }
+      if (keys.length < threadCount) {
+        throw new WorkloadException("Thread count " + totalThreads + " must be greater "
+            + "than or equal to key count " + keys.length);
+      }
+      
+      int keysPerThread = keys.length / totalThreads;
+      keyIdx = keysPerThread * threadID;
+      keyIdxStart = keyIdx;
+      if (totalThreads - 1 == threadID) {
+        keyIdxEnd = keys.length;
+      } else {
+        keyIdxEnd = keyIdxStart + keysPerThread;
+      }
+      
+      tagValueIdxs = new int[tagPairs]; // all zeros
+      
+      final String startingTimestamp = 
+          properties.getProperty(CoreWorkload.INSERT_START_PROPERTY);
+      if (startingTimestamp == null || startingTimestamp.isEmpty()) {
+        timestampGenerator = randomizeTimestampOrder ? 
+            new RandomDiscreteTimestampGenerator(timestampInterval, timeUnits, maxOffsets) :
+            new UnixEpochTimestampGenerator(timestampInterval, timeUnits);
+      } else {
+        try {
+          timestampGenerator = randomizeTimestampOrder ? 
+              new RandomDiscreteTimestampGenerator(timestampInterval, timeUnits, 
+                  Long.parseLong(startingTimestamp), maxOffsets) :
+              new UnixEpochTimestampGenerator(timestampInterval, timeUnits, 
+                  Long.parseLong(startingTimestamp));
+        } catch (NumberFormatException nfe) {
+          throw new WorkloadException("Unable to parse the " + 
+              CoreWorkload.INSERT_START_PROPERTY, nfe);
+        }
+      }
+      // Set the last value properly for the timestamp, otherwise it may start 
+      // one interval ago.
+      startTimestamp = timestampGenerator.nextValue();
+      // TODO - pick it
+      queryOffsetGenerator = new UniformLongGenerator(0, maxOffsets - 2);
+    }
+    
+    /**
+     * Generates the next write value for thread.
+     * @param map An initialized map to populate with tag keys and values as well
+     * as the timestamp and actual value.
+     * @param isInsert Whether or not it's an insert or an update. Updates will pick
+     * an older timestamp (if random isn't enabled).
+     * @return The next key to write.
+     */
+    protected String nextDataPoint(final Map<String, ByteIterator> map, final boolean isInsert) {
+      int iterations = sparsity <= 0 ? 1 : 
+          Utils.random().nextInt((int) ((double) perKeyCardinality * sparsity));
+      if (iterations < 1) {
+        iterations = 1;
+      }
+      while (true) {
+        iterations--;
+        if (rollover) {
+          timestampGenerator.nextValue();
+          rollover = false;
+        }
+        String key = null;
+        if (iterations <= 0) {
+          final TreeMap<String, String> validationTags;
+          if (dataintegrity) {
+            validationTags = new TreeMap<String, String>();
+          } else {
+            validationTags = null;
+          }
+          key = keys[keyIdx];
+          int overallIdx = keyIdx * cumulativeCardinality[0];
+          for (int i = 0; i < tagPairs; ++i) {
+            int tvidx = tagValueIdxs[i];
+            map.put(tagKeys[i], new StringByteIterator(tagValues[tvidx]));
+            if (dataintegrity) {
+              validationTags.put(tagKeys[i], tagValues[tvidx]);
+            }
+            if (delayedSeries > 0) {
+              overallIdx += (tvidx * cumulativeCardinality[i + 1]);
+            }
+          }
+          
+          if (!isInsert) {
+            final long delta = (timestampGenerator.currentValue() - startTimestamp) / timestampInterval;
+            final int intervals = Utils.random().nextInt((int) delta);
+            map.put(timestampKey, new NumericByteIterator(startTimestamp + (intervals * timestampInterval)));
+          } else if (delayedSeries > 0) {
+            // See if the series falls in a delay bucket and calculate an offset earlier
+            // than the current timestamp value if so.
+            double pct = (double) overallIdx / (double) totalCardinality;
+            if (pct < delayedSeries) {
+              int modulo = overallIdx % delayedIntervals;
+              if (modulo < 0) {
+                modulo *= -1;
+              }
+              map.put(timestampKey, new NumericByteIterator(timestampGenerator.currentValue() - 
+                  timestampInterval * modulo));
+            } else {
+              map.put(timestampKey, new NumericByteIterator(timestampGenerator.currentValue()));
+            }
+          } else {
+            map.put(timestampKey, new NumericByteIterator(timestampGenerator.currentValue()));
+          }
+          
+          if (dataintegrity) {
+            map.put(valueKey, new NumericByteIterator(validationFunction(key, 
+                timestampGenerator.currentValue(), validationTags)));
+          } else {
+            switch (valueType) {
+            case INTEGERS:
+              map.put(valueKey, new NumericByteIterator(Utils.random().nextInt()));
+              break;
+            case FLOATS:
+              map.put(valueKey, new NumericByteIterator(
+                  Utils.random().nextDouble() * (double) 100000));
+              break;
+            case MIXED:
+              if (Utils.random().nextBoolean()) {
+                map.put(valueKey, new NumericByteIterator(Utils.random().nextInt()));
+              } else {
+                map.put(valueKey, new NumericByteIterator(
+                    Utils.random().nextDouble() * (double) 100000));
+              }
+              break;
+            default:
+              throw new IllegalStateException("Somehow we didn't have a value "
+                  + "type configured that we support: " + valueType);
+            }        
+          }
+        }
+        
+        boolean tagRollover = false;
+        for (int i = tagCardinality.length - 1; i >= 0; --i) {
+          if (tagCardinality[i] <= 1) {
+            // nothing to increment here
+            continue;
+          }
+          
+          if (tagValueIdxs[i] + 1 >= tagCardinality[i]) {
+            tagValueIdxs[i] = 0;
+            if (i == firstIncrementableCardinality) {
+              tagRollover = true;
+            }
+          } else {
+            ++tagValueIdxs[i];
+            break;
+          }
+        }
+        
+        if (tagRollover) {
+          if (keyIdx + 1 >= keyIdxEnd) {
+            keyIdx = keyIdxStart;
+            rollover = true;
+          } else {
+            ++keyIdx;
+          }
+        }
+        
+        if (iterations <= 0) {
+          return key;
+        }
+      }
+    }
+  }
+
+}
\ No newline at end of file
diff --git a/core/src/test/java/com/yahoo/ycsb/workloads/TestTimeSeriesWorkload.java b/core/src/test/java/com/yahoo/ycsb/workloads/TestTimeSeriesWorkload.java
new file mode 100644
index 0000000000000000000000000000000000000000..55acc1f625b8d046f2c5b8d8a6907dae72e348a8
--- /dev/null
+++ b/core/src/test/java/com/yahoo/ycsb/workloads/TestTimeSeriesWorkload.java
@@ -0,0 +1,550 @@
+/**
+ * Copyright (c) 2017 YCSB contributors All rights reserved.
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+package com.yahoo.ycsb.workloads;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.Vector;
+
+import com.yahoo.ycsb.ByteIterator;
+import com.yahoo.ycsb.Client;
+import com.yahoo.ycsb.DB;
+import com.yahoo.ycsb.NumericByteIterator;
+import com.yahoo.ycsb.Status;
+import com.yahoo.ycsb.StringByteIterator;
+import com.yahoo.ycsb.Utils;
+import com.yahoo.ycsb.WorkloadException;
+import com.yahoo.ycsb.measurements.Measurements;
+
+import org.testng.annotations.Test;
+
+public class TestTimeSeriesWorkload {
+  
+  @Test
+  public void twoThreads() throws Exception {
+    final Properties p = getUTProperties();
+    Measurements.setProperties(p);
+    
+    final TimeSeriesWorkload wl = new TimeSeriesWorkload();
+    wl.init(p);
+    Object threadState = wl.initThread(p, 0, 2);
+    
+    MockDB db = new MockDB();
+    for (int i = 0; i < 74; i++) {
+      assertTrue(wl.doInsert(db, threadState));
+    }
+    
+    assertEquals(db.keys.size(), 74);
+    assertEquals(db.values.size(), 74);
+    long timestamp = 1451606400;
+    for (int i = 0; i < db.keys.size(); i++) {
+      assertEquals(db.keys.get(i), "AAAA");
+      assertEquals(db.values.get(i).get("AA").toString(), "AAAA");
+      assertEquals(Utils.bytesToLong(db.values.get(i).get(
+          TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp);
+      assertNotNull(db.values.get(i).get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT));
+      if (i % 2 == 0) {
+        assertEquals(db.values.get(i).get("AB").toString(), "AAAA");
+      } else {
+        assertEquals(db.values.get(i).get("AB").toString(), "AAAB");
+        timestamp += 60;
+      }
+    }
+    
+    threadState = wl.initThread(p, 1, 2);
+    db = new MockDB();
+    for (int i = 0; i < 74; i++) {
+      assertTrue(wl.doInsert(db, threadState));
+    }
+    
+    assertEquals(db.keys.size(), 74);
+    assertEquals(db.values.size(), 74);
+    timestamp = 1451606400;
+    for (int i = 0; i < db.keys.size(); i++) {
+      assertEquals(db.keys.get(i), "AAAB");
+      assertEquals(db.values.get(i).get("AA").toString(), "AAAA");
+      assertEquals(Utils.bytesToLong(db.values.get(i).get(
+          TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp);
+      assertNotNull(db.values.get(i).get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT));
+      if (i % 2 == 0) {
+        assertEquals(db.values.get(i).get("AB").toString(), "AAAA");
+      } else {
+        assertEquals(db.values.get(i).get("AB").toString(), "AAAB");
+        timestamp += 60;
+      }
+    }
+  }
+  
+  @Test (expectedExceptions = WorkloadException.class)
+  public void badTimeUnit() throws Exception {
+    final Properties p = new Properties();
+    p.put(TimeSeriesWorkload.TIMESTAMP_UNITS_PROPERTY, "foobar");
+    getWorkload(p, true);
+  }
+  
+  @Test (expectedExceptions = WorkloadException.class)
+  public void failedToInitWorkloadBeforeThreadInit() throws Exception {
+    final Properties p = getUTProperties();
+    final TimeSeriesWorkload wl = getWorkload(p, false);
+    //wl.init(p); // <-- we NEED this :(
+    final Object threadState = wl.initThread(p, 0, 2);
+    
+    final MockDB db = new MockDB();
+    wl.doInsert(db, threadState);
+  }
+  
+  @Test (expectedExceptions = IllegalStateException.class)
+  public void failedToInitThread() throws Exception {
+    final Properties p = getUTProperties();
+    final TimeSeriesWorkload wl = getWorkload(p, true);
+    
+    final MockDB db = new MockDB();
+    wl.doInsert(db, null);
+  }
+  
+  @Test
+  public void insertOneKeyTwoTagsLowCardinality() throws Exception {
+    final Properties p = getUTProperties();
+    p.put(CoreWorkload.FIELD_COUNT_PROPERTY, "1");
+    final TimeSeriesWorkload wl = getWorkload(p, true);
+    final Object threadState = wl.initThread(p, 0, 1);
+    
+    final MockDB db = new MockDB();
+    for (int i = 0; i < 74; i++) {
+      assertTrue(wl.doInsert(db, threadState));
+    }
+    
+    assertEquals(db.keys.size(), 74);
+    assertEquals(db.values.size(), 74);
+    long timestamp = 1451606400;
+    for (int i = 0; i < db.keys.size(); i++) {
+      assertEquals(db.keys.get(i), "AAAA");
+      assertEquals(db.values.get(i).get("AA").toString(), "AAAA");
+      assertEquals(Utils.bytesToLong(db.values.get(i).get(
+          TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp);
+      assertTrue(((NumericByteIterator) db.values.get(i)
+          .get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)).isFloatingPoint());
+      if (i % 2 == 0) {
+        assertEquals(db.values.get(i).get("AB").toString(), "AAAA");
+      } else {
+        assertEquals(db.values.get(i).get("AB").toString(), "AAAB");
+        timestamp += 60;
+      }
+    }
+  }
+  
+  @Test
+  public void insertTwoKeysTwoTagsLowCardinality() throws Exception {
+    final Properties p = getUTProperties();
+    
+    final TimeSeriesWorkload wl = getWorkload(p, true);
+    final Object threadState = wl.initThread(p, 0, 1);
+    
+    final MockDB db = new MockDB();
+    for (int i = 0; i < 74; i++) {
+      assertTrue(wl.doInsert(db, threadState));
+    }
+    
+    assertEquals(db.keys.size(), 74);
+    assertEquals(db.values.size(), 74);
+    long timestamp = 1451606400;
+    int metricCtr = 0;
+    for (int i = 0; i < db.keys.size(); i++) {
+      assertEquals(db.values.get(i).get("AA").toString(), "AAAA");
+      assertEquals(Utils.bytesToLong(db.values.get(i).get(
+          TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp);
+      assertTrue(((NumericByteIterator) db.values.get(i)
+          .get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)).isFloatingPoint());
+      if (i % 2 == 0) {
+        assertEquals(db.values.get(i).get("AB").toString(), "AAAA");
+      } else {
+        assertEquals(db.values.get(i).get("AB").toString(), "AAAB");
+      }
+      if (metricCtr++ > 1) {
+        assertEquals(db.keys.get(i), "AAAB");
+        if (metricCtr >= 4) {
+          metricCtr = 0;
+          timestamp += 60;
+        }
+      } else {
+        assertEquals(db.keys.get(i), "AAAA");
+      }
+    }
+  }
+  
+  @Test
+  public void insertTwoKeysTwoThreads() throws Exception {
+    final Properties p = getUTProperties();
+    
+    final TimeSeriesWorkload wl = getWorkload(p, true);
+    Object threadState = wl.initThread(p, 0, 2);
+    
+    MockDB db = new MockDB();
+    for (int i = 0; i < 74; i++) {
+      assertTrue(wl.doInsert(db, threadState));
+    }
+    
+    assertEquals(db.keys.size(), 74);
+    assertEquals(db.values.size(), 74);
+    long timestamp = 1451606400;
+    for (int i = 0; i < db.keys.size(); i++) {
+      assertEquals(db.keys.get(i), "AAAA"); // <-- key 1
+      assertEquals(db.values.get(i).get("AA").toString(), "AAAA");
+      assertEquals(Utils.bytesToLong(db.values.get(i).get(
+          TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp);
+      assertTrue(((NumericByteIterator) db.values.get(i)
+          .get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)).isFloatingPoint());
+      if (i % 2 == 0) {
+        assertEquals(db.values.get(i).get("AB").toString(), "AAAA");
+      } else {
+        assertEquals(db.values.get(i).get("AB").toString(), "AAAB");
+        timestamp += 60;
+      }
+    }
+    
+    threadState = wl.initThread(p, 1, 2);
+    db = new MockDB();
+    for (int i = 0; i < 74; i++) {
+      assertTrue(wl.doInsert(db, threadState));
+    }
+    
+    assertEquals(db.keys.size(), 74);
+    assertEquals(db.values.size(), 74);
+    timestamp = 1451606400;
+    for (int i = 0; i < db.keys.size(); i++) {
+      assertEquals(db.keys.get(i), "AAAB"); // <-- key 2
+      assertEquals(db.values.get(i).get("AA").toString(), "AAAA");
+      assertEquals(Utils.bytesToLong(db.values.get(i).get(
+          TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp);
+      assertNotNull(db.values.get(i).get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT));
+      if (i % 2 == 0) {
+        assertEquals(db.values.get(i).get("AB").toString(), "AAAA");
+      } else {
+        assertEquals(db.values.get(i).get("AB").toString(), "AAAB");
+        timestamp += 60;
+      }
+    }
+  }
+  
+  @Test
+  public void insertThreeKeysTwoThreads() throws Exception {
+    // To make sure the distribution doesn't miss any metrics
+    final Properties p = getUTProperties();
+    p.put(CoreWorkload.FIELD_COUNT_PROPERTY, "3");
+    
+    final TimeSeriesWorkload wl = getWorkload(p, true);
+    Object threadState = wl.initThread(p, 0, 2);
+    
+    MockDB db = new MockDB();
+    for (int i = 0; i < 74; i++) {
+      assertTrue(wl.doInsert(db, threadState));
+    }
+    
+    assertEquals(db.keys.size(), 74);
+    assertEquals(db.values.size(), 74);
+    long timestamp = 1451606400;
+    for (int i = 0; i < db.keys.size(); i++) {
+      assertEquals(db.keys.get(i), "AAAA");
+      assertEquals(db.values.get(i).get("AA").toString(), "AAAA");
+      assertEquals(Utils.bytesToLong(db.values.get(i).get(
+          TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp);
+      assertTrue(((NumericByteIterator) db.values.get(i)
+          .get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)).isFloatingPoint());
+      if (i % 2 == 0) {
+        assertEquals(db.values.get(i).get("AB").toString(), "AAAA");
+      } else {
+        assertEquals(db.values.get(i).get("AB").toString(), "AAAB");
+        timestamp += 60;
+      }
+    }
+    
+    threadState = wl.initThread(p, 1, 2);
+    db = new MockDB();
+    for (int i = 0; i < 74; i++) {
+      assertTrue(wl.doInsert(db, threadState));
+    }
+    
+    timestamp = 1451606400;
+    int metricCtr = 0;
+    for (int i = 0; i < db.keys.size(); i++) {
+      assertEquals(db.values.get(i).get("AA").toString(), "AAAA");
+      assertEquals(Utils.bytesToLong(db.values.get(i).get(
+          TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp);
+      assertNotNull(db.values.get(i).get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT));
+      if (i % 2 == 0) {
+        assertEquals(db.values.get(i).get("AB").toString(), "AAAA");
+      } else {
+        assertEquals(db.values.get(i).get("AB").toString(), "AAAB");
+      }
+      if (metricCtr++ > 1) {
+        assertEquals(db.keys.get(i), "AAAC");
+        if (metricCtr >= 4) {
+          metricCtr = 0;
+          timestamp += 60;
+        }
+      } else {
+        assertEquals(db.keys.get(i), "AAAB");
+      }
+    }
+  }
+  
+  @Test
+  public void insertWithValidation() throws Exception {
+    final Properties p = getUTProperties();
+    p.put(CoreWorkload.FIELD_COUNT_PROPERTY, "1");
+    p.put(CoreWorkload.DATA_INTEGRITY_PROPERTY, "true");
+    p.put(TimeSeriesWorkload.VALUE_TYPE_PROPERTY, "integers");
+    final TimeSeriesWorkload wl = getWorkload(p, true);
+    final Object threadState = wl.initThread(p, 0, 1);
+    
+    final MockDB db = new MockDB();
+    for (int i = 0; i < 74; i++) {
+      assertTrue(wl.doInsert(db, threadState));
+    }
+    
+    assertEquals(db.keys.size(), 74);
+    assertEquals(db.values.size(), 74);
+    long timestamp = 1451606400;
+    for (int i = 0; i < db.keys.size(); i++) {
+      assertEquals(db.keys.get(i), "AAAA");
+      assertEquals(db.values.get(i).get("AA").toString(), "AAAA");
+      assertEquals(Utils.bytesToLong(db.values.get(i).get(
+          TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp);
+      assertFalse(((NumericByteIterator) db.values.get(i)
+          .get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)).isFloatingPoint());
+      
+      // validation check
+      final TreeMap<String, String> validationTags = new TreeMap<String, String>();
+      for (final Entry<String, ByteIterator> entry : db.values.get(i).entrySet()) {
+        if (entry.getKey().equals(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT) || 
+            entry.getKey().equals(TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT)) {
+          continue;
+        }
+        validationTags.put(entry.getKey(), entry.getValue().toString());
+      }
+      assertEquals(wl.validationFunction(db.keys.get(i), timestamp, validationTags), 
+          ((NumericByteIterator) db.values.get(i).get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)).getLong());
+      
+      if (i % 2 == 0) {
+        assertEquals(db.values.get(i).get("AB").toString(), "AAAA");
+      } else {
+        assertEquals(db.values.get(i).get("AB").toString(), "AAAB");
+        timestamp += 60;
+      }
+    }
+  }
+  
+  @Test
+  public void read() throws Exception {
+    final Properties p = getUTProperties();
+    final TimeSeriesWorkload wl = getWorkload(p, true);
+    final Object threadState = wl.initThread(p, 0, 1);
+    
+    final MockDB db = new MockDB();
+    for (int i = 0; i < 20; i++) {
+      wl.doTransactionRead(db, threadState);
+    }
+  }
+  
+  @Test
+  public void verifyRow() throws Exception {
+    final Properties p = getUTProperties();
+    final TimeSeriesWorkload wl = getWorkload(p, true);
+    
+    final TreeMap<String, String> validationTags = new TreeMap<String, String>();
+    final HashMap<String, ByteIterator> cells = new HashMap<String, ByteIterator>();
+    
+    validationTags.put("AA", "AAAA");
+    cells.put("AA", new StringByteIterator("AAAA"));
+    validationTags.put("AB", "AAAB");
+    cells.put("AB", new StringByteIterator("AAAB"));
+    long hash = wl.validationFunction("AAAA", 1451606400L, validationTags);
+        
+    cells.put(TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT, new NumericByteIterator(1451606400L));
+    cells.put(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT, new NumericByteIterator(hash));
+    
+    assertEquals(wl.verifyRow("AAAA", cells), Status.OK);
+    
+    // tweak the last value a bit
+    for (final ByteIterator it : cells.values()) {
+      it.reset();
+    }
+    cells.put(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT, new NumericByteIterator(hash + 1));
+    assertEquals(wl.verifyRow("AAAA", cells), Status.UNEXPECTED_STATE);
+    
+    // no value cell, returns an unexpected state
+    for (final ByteIterator it : cells.values()) {
+      it.reset();
+    }
+    cells.remove(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT);
+    assertEquals(wl.verifyRow("AAAA", cells), Status.UNEXPECTED_STATE);
+  }
+  
+  @Test
+  public void validateSettingsDataIntegrity() throws Exception {
+    Properties p = getUTProperties();
+    
+    // data validation incompatibilities
+    p.setProperty(CoreWorkload.DATA_INTEGRITY_PROPERTY, "true");
+    try {
+      getWorkload(p, true);
+      fail("Expected WorkloadException");
+    } catch (WorkloadException e) { }
+    
+    p.setProperty(TimeSeriesWorkload.VALUE_TYPE_PROPERTY, "integers"); // now it's ok
+    p.setProperty(TimeSeriesWorkload.GROUPBY_PROPERTY, "sum"); // now it's not
+    try {
+      getWorkload(p, true);
+      fail("Expected WorkloadException");
+    } catch (WorkloadException e) { }
+    
+    p.setProperty(TimeSeriesWorkload.GROUPBY_PROPERTY, "");
+    p.setProperty(TimeSeriesWorkload.DOWNSAMPLING_FUNCTION_PROPERTY, "sum");
+    p.setProperty(TimeSeriesWorkload.DOWNSAMPLING_INTERVAL_PROPERTY, "60");
+    try {
+      getWorkload(p, true);
+      fail("Expected WorkloadException");
+    } catch (WorkloadException e) { }
+    
+    p.setProperty(TimeSeriesWorkload.DOWNSAMPLING_FUNCTION_PROPERTY, "");
+    p.setProperty(TimeSeriesWorkload.DOWNSAMPLING_INTERVAL_PROPERTY, "");
+    p.setProperty(TimeSeriesWorkload.QUERY_TIMESPAN_PROPERTY, "60");
+    try {
+      getWorkload(p, true);
+      fail("Expected WorkloadException");
+    } catch (WorkloadException e) { }
+    
+    p = getUTProperties();
+    p.setProperty(CoreWorkload.DATA_INTEGRITY_PROPERTY, "true");
+    p.setProperty(TimeSeriesWorkload.VALUE_TYPE_PROPERTY, "integers");
+    p.setProperty(TimeSeriesWorkload.RANDOMIZE_TIMESERIES_ORDER_PROPERTY, "true");
+    try {
+      getWorkload(p, true);
+      fail("Expected WorkloadException");
+    } catch (WorkloadException e) { }
+    
+    p.setProperty(TimeSeriesWorkload.RANDOMIZE_TIMESERIES_ORDER_PROPERTY, "false");
+    p.setProperty(TimeSeriesWorkload.INSERT_START_PROPERTY, "");
+    try {
+      getWorkload(p, true);
+      fail("Expected WorkloadException");
+    } catch (WorkloadException e) { }
+  }
+  
+  /** Helper method that generates unit testing defaults for the properties map */
+  private Properties getUTProperties() {
+    final Properties p = new Properties();
+    p.put(Client.RECORD_COUNT_PROPERTY, "10");
+    p.put(CoreWorkload.FIELD_COUNT_PROPERTY, "2");
+    p.put(CoreWorkload.FIELD_LENGTH_PROPERTY, "4");
+    p.put(TimeSeriesWorkload.TAG_KEY_LENGTH_PROPERTY, "2");
+    p.put(TimeSeriesWorkload.TAG_VALUE_LENGTH_PROPERTY, "4");
+    p.put(TimeSeriesWorkload.TAG_COUNT_PROPERTY, "2");
+    p.put(TimeSeriesWorkload.TAG_CARDINALITY_PROPERTY, "1,2");
+    p.put(CoreWorkload.INSERT_START_PROPERTY, "1451606400");
+    p.put(TimeSeriesWorkload.DELAYED_SERIES_PROPERTY, "0");
+    p.put(TimeSeriesWorkload.RANDOMIZE_TIMESERIES_ORDER_PROPERTY, "false");
+    return p;
+  }
+  
+  /** Helper to setup the workload for testing. */
+  private TimeSeriesWorkload getWorkload(final Properties p, final boolean init) 
+      throws WorkloadException {
+    Measurements.setProperties(p);
+    if (!init) {
+      return new TimeSeriesWorkload();
+    } else {
+      final TimeSeriesWorkload workload = new TimeSeriesWorkload();
+      workload.init(p);
+      return workload;
+    }
+  }
+  
+  static class MockDB extends DB {
+    final List<String> keys = new ArrayList<String>();
+    final List<Map<String, ByteIterator>> values = 
+        new ArrayList<Map<String, ByteIterator>>();
+    
+    @Override
+    public Status read(String table, String key, Set<String> fields,
+                       Map<String, ByteIterator> result) {
+      return Status.OK;
+    }
+
+    @Override
+    public Status scan(String table, String startkey, int recordcount,
+        Set<String> fields, Vector<HashMap<String, ByteIterator>> result) {
+      // TODO Auto-generated method stub
+      return Status.OK;
+    }
+
+    @Override
+    public Status update(String table, String key,
+        Map<String, ByteIterator> values) {
+      // TODO Auto-generated method stub
+      return Status.OK;
+    }
+
+    @Override
+    public Status insert(String table, String key,
+        Map<String, ByteIterator> values) {
+      keys.add(key);
+      this.values.add(values);
+      return Status.OK;
+    }
+
+    @Override
+    public Status delete(String table, String key) {
+      // TODO Auto-generated method stub
+      return Status.OK;
+    }
+    
+    public void dumpStdout() {
+      for (int i = 0; i < keys.size(); i++) {
+        System.out.print("[" + i + "] Key: " + keys.get(i) + " Values: {");
+        int x = 0;
+        for (final Entry<String, ByteIterator> entry : values.get(i).entrySet()) {
+          if (x++ > 0) {
+            System.out.print(", ");
+          }
+          System.out.print("{" + entry.getKey() + " => ");
+          if (entry.getKey().equals("YCSBV")) {
+            System.out.print(new String(Utils.bytesToDouble(entry.getValue().toArray()) + "}"));  
+          } else if (entry.getKey().equals("YCSBTS")) {
+            System.out.print(new String(Utils.bytesToLong(entry.getValue().toArray()) + "}"));
+          } else {
+            System.out.print(new String(entry.getValue().toArray()) + "}");
+          }
+        }
+        System.out.println("}");
+      }
+    }
+  }
+}
\ No newline at end of file
diff --git a/workloads/tsworkload_template b/workloads/tsworkload_template
new file mode 100644
index 0000000000000000000000000000000000000000..9b79a0096fb8bc676c4185a5cc39864e75e97db6
--- /dev/null
+++ b/workloads/tsworkload_template
@@ -0,0 +1,283 @@
+# Copyright (c) 2017 YCSB contributors. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you
+# may not use this file except in compliance with the License. You
+# may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied. See the License for the specific language governing
+# permissions and limitations under the License. See accompanying
+# LICENSE file.
+
+# Yahoo! Cloud System Benchmark
+# Time Series Workload Template: Default Values
+#
+# File contains all properties that can be set to define a
+# YCSB session. All properties are set to their default
+# value if one exists. If not, the property is commented
+# out. When a property has a finite number of settings,
+# the default is enabled and the alternates are shown in
+# comments below it.
+# 
+# Use of each property is explained through comments in Client.java, 
+# CoreWorkload.java, TimeSeriesWorkload.java or on the YCSB wiki page:
+# https://github.com/brianfrankcooper/YCSB/wiki/TimeSeriesWorkload
+
+# The name of the workload class to use. Always the following.
+workload=com.yahoo.ycsb.workloads.TimeSeriesWorkload
+
+# The default is Java's Long.MAX_VALUE.
+# The number of records in the table to be inserted in
+# the load phase or the number of records already in the 
+# table before the run phase.
+recordcount=1000000
+
+# There is no default setting for operationcount but it is
+# required to be set.
+# The number of operations to use during the run phase.
+operationcount=3000000
+
+# The number of insertions to do, if different from recordcount.
+# Used with insertstart to grow an existing table.
+#insertcount=
+
+# ..::NOTE::.. This is different from the CoreWorkload!
+# The starting timestamp of a run as a Unix Epoch numeral in the 
+# unit set in 'timestampunits'. This is used to determine what 
+# the first timestamp should be when writing or querying as well
+# as how many offsets (based on 'timestampinterval').
+#insertstart=
+
+# The units represented by the 'insertstart' timestamp as well as
+# durations such as 'timestampinterval', 'querytimespan', etc.
+# For values, see https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/TimeUnit.html
+# Note that only seconds through nanoseconds are supported.
+timestampunits=SECONDS
+
+# The amount of time between each value in every time series in
+# the units of 'timestampunits'.
+timestampinterval=60
+
+# ..::NOTE::.. This is different from the CoreWorkload!
+# Represents the number of unique "metrics" or "keys" for time series.
+# E.g. "sys.cpu" may be a single field or "metric" while there may be many
+# time series sharing that key (perhaps a host tag with "web01" and "web02"
+# as options).
+fieldcount=16
+
+# The number of characters in the "metric" or "key".
+fieldlength=8
+
+# --- TODO ---?
+# The distribution used to choose the length of a field
+fieldlengthdistribution=constant
+#fieldlengthdistribution=uniform
+#fieldlengthdistribution=zipfian
+
+# The number of unique tag combinations for each time series. E.g
+# if this value is 4, each record will have a key and 4 tag combinations
+# such as A=A, B=A, C=A, D=A.
+tagcount=4
+
+# The cardinality (number of unique values) of each tag value for 
+# every "metric" or field as a  comma separated list. Each value must 
+# be a number from 1 to Java's Integer.MAX_VALUE and there must be 
+# 'tagcount' values. If there are  more or fewer values than 
+#'tagcount' then either it is ignored or 1 is substituted respectively.
+tagcardinality=1,2,4,8
+
+# The length of each tag key in characters.
+tagkeylength=8
+
+# The length of each tag value in characters.
+tagvaluelength=8
+
+# The character separating tag keys from tag values when reads, deletes
+# or scans are executed against a database. The default is the equals sign
+# so a field passed in a read to a DB may look like 'AA=AB'.
+tagpairdelimiter==
+
+# The delimiter between keys and tags when a delete is passed to the DB.
+# E.g. if there was a key and a field, the request key would look like:
+# 'AA:AA=AB'
+deletedelimiter=:
+
+# Whether or not to randomize the timestamp order when performing inserts
+# and updates against a DB. By default all writes perform with the 
+# timestamps moving linearly forward in time once all time series for a
+# given key have been written.
+randomwritetimestamporder=false
+
+# Whether or not to randomly shuffle the time series order when writing.
+# This will shuffle the keys, tag keys and tag values.
+# ************************************************************************
+# WARNING - When this is enabled, reads and scans will likely return many
+# empty results as invalid tag combinations will be chosen. Likewise 
+# this setting is INCOMPATIBLE with data integrity checks.
+# ************************************************************************
+randomtimeseriesorder=false
+
+# The type of numerical data generated for each data point. The values are
+# 64 bit signed integers, double precision floating points or a random mix.
+# For data integrity, this setting is ignored and values are switched to
+# 64 bit signed ints.
+#valuetype=integers
+valuetype=floats
+#valuetype=mixed
+
+# A value from 0 to 0.999999 representing how sparse each time series
+# should be. The higher this value, the greater the time interval between
+# values in a single series. For example, if sparsity is 0 and there are
+# 10 time series with a 'timestampinterval' of 60 seconds with a total
+# time range of 10 intervals, you would see 100 values written, one per
+# timestamp interval per time series. If the sparsity is 0.50 then there
+# would be only about 50 values written so some time series would have
+# missing values at each interval.
+sparsity=0.00
+
+# The percentage of time series that are "lagging" behind the current
+# timestamp of the writer. This is used to mimic a common behavior where
+# most sources (agents, sensors, etc) are writing data in sync (same timestamp)
+# but a subset are running behind due to buffering, latency issues, etc.
+delayedSeries=0.10
+
+# The maximum amount of delay for delayed series in interval counts. The 
+# actual delay is chosen based on a modulo of the series index.
+delayedIntervals=5
+
+# The fixed or maximum amount of time added to the start time of a 
+# read or scan operation to generate a query over a range of time 
+# instead of a single timestamp. Units are shared with 'timestampunits'.
+# For example if the value is set to 3600 seconds (1 hour) then 
+# each read would pick a random start timestamp based on the 
+#'insertstart' value and number of intervals, then add 3600 seconds
+# to create the end time of the query. If this value is 0 then reads
+# will only provide a single timestamp. 
+# WARNING: Cannot be used with 'dataintegrity'.
+querytimespan=0
+
+# Whether or not reads should choose a random time span (aligned to
+# the 'timestampinterval' value) for each read or scan request starting
+# at 0 and reaching 'querytimespan' as the max.
+queryrandomtimespan=false
+
+# A delimiter character used to separate the start and end timestamps
+# of a read query when 'querytimespan' is enabled.
+querytimespandelimiter=,
+
+# A unique key given to read, scan and delete operations when the
+# operation should perform a group-by (multi-series aggregation) on one 
+# or more tags. If 'groupbyfunction' is set, this key will be given with
+# the configured function.
+groupbykey=YCSBGB
+
+# A function name (e.g. 'sum', 'max' or 'avg') passed during reads, 
+# scans and deletes to cause the database to perform a group-by 
+# operation on one or more tags. If this value is empty or null 
+# (default), group-by operations are not performed
+#groupbyfunction=
+
+# A comma separated list of 0s or 1s to denote which of the tag keys
+# should be grouped during group-by operations. The number of values
+# must match the number of tags in 'tagcount'.
+#groupbykeys=0,0,1,1
+
+# A unique key given to read and scan operations when the operation
+# should downsample the results of a query into lower resolution
+# data. If 'downsamplingfunction' is set, this key will be given with
+# the configured function.
+downsamplingkey=YCSBDS
+
+# A function name (e.g. 'sum', 'max' or 'avg') passed during reads and
+# scans to cause the database to perform a downsampling operation
+# returning lower resolution data. If this value is empty or null 
+# (default), downsampling is not performed.
+#downsamplingfunction=
+
+# A time interval for which to downsample the raw data into. Shares
+# the same units as 'timestampinterval'. This value must be greater
+# than 'timestampinterval'. E.g. if the timestamp interval for raw
+# data is 60 seconds, the downsampling interval could be 3600 seconds
+# to roll up the data into 1 hour buckets.
+#downsamplinginterval=
+
+# What proportion of operations are reads
+readproportion=0.10
+
+# What proportion of operations are updates
+updateproportion=0.00
+
+# What proportion of operations are inserts
+insertproportion=0.90
+
+# The distribution of requests across the keyspace
+requestdistribution=zipfian
+#requestdistribution=uniform
+#requestdistribution=latest
+
+# The name of the database table to run queries against
+table=usertable
+
+# Whether or not data should be validated during writes and reads. If
+# set then the data type is always a 64 bit signed integer and is the
+# hash code of the key, timestamp and tags. 
+dataintegrity=false
+
+# How the latency measurements are presented
+measurementtype=histogram
+#measurementtype=timeseries
+#measurementtype=raw
+# When measurementtype is set to raw, measurements will be output
+# as RAW datapoints in the following csv format:
+# "operation, timestamp of the measurement, latency in us"
+#
+# Raw datapoints are collected in-memory while the test is running. Each
+# data point consumes about 50 bytes (including java object overhead).
+# For a typical run of 1 million to 10 million operations, this should
+# fit into memory most of the time. If you plan to do 100s of millions of
+# operations per run, consider provisioning a machine with larger RAM when using
+# the RAW measurement type, or split the run into multiple runs.
+#
+# Optionally, you can specify an output file to save raw datapoints.
+# Otherwise, raw datapoints will be written to stdout.
+# The output file will be appended to if it already exists, otherwise
+# a new output file will be created.
+#measurement.raw.output_file = /tmp/your_output_file_for_this_run
+
+# JVM Reporting.
+#
+# Measure JVM information over time including GC counts, max and min memory
+# used, max and min thread counts, max and min system load and others. This
+# setting must be enabled in conjunction with the "-s" flag to run the status
+# thread. Every "status.interval", the status thread will capture JVM 
+# statistics and record the results. At the end of the run, max and mins will
+# be recorded.
+# measurement.trackjvm = false
+
+# The range of latencies to track in the histogram (milliseconds)
+histogram.buckets=1000
+
+# Granularity for time series (in milliseconds)
+timeseries.granularity=1000
+
+# Latency reporting.
+#
+# YCSB records latency of failed operations separately from successful ones.
+# Latency of all OK operations will be reported under their operation name,
+# such as [READ], [UPDATE], etc.
+#
+# For failed operations:
+# By default we don't track latency numbers of specific error status.
+# We just report latency of all failed operation under one measurement name
+# such as [READ-FAILED]. But optionally, user can configure to have either:
+# 1. Record and report latency for each and every error status code by
+#    setting reportLatencyForEachError to true, or
+# 2. Record and report latency for a select set of error status codes by
+#    providing a CSV list of Status codes via the "latencytrackederrors"
+#    property.
+# reportlatencyforeacherror=false
+# latencytrackederrors="<comma separated strings of error codes>"
diff --git a/workloads/tsworkloada b/workloads/tsworkloada
new file mode 100644
index 0000000000000000000000000000000000000000..868007f2fc4f36bd30a3a34f4356915324f76bc0
--- /dev/null
+++ b/workloads/tsworkloada
@@ -0,0 +1,46 @@
+# Copyright (c) 2017 YCSB contributors. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you
+# may not use this file except in compliance with the License. You
+# may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied. See the License for the specific language governing
+# permissions and limitations under the License. See accompanying
+# LICENSE file.
+
+# Yahoo! Cloud System Benchmark
+# Workload A: Small cardinality consistent data for 2 days
+#   Application example: Typical monitoring of a single compute or small 
+#   sensor station where 90% of the load is write and only 10% is read 
+#   (it's usually much less). All writes are inserts. No sparsity so 
+#   every series will have a value at every timestamp.
+#
+#   Read/insert ratio: 10/90
+#   Cardinality: 16 per key (field), 64 fields for a total of 1,024 
+#                time series.
+workload=com.yahoo.ycsb.workloads.TimeSeriesWorkload
+
+recordcount=1474560
+operationcount=2949120
+
+fieldlength=8
+fieldcount=64
+tagcount=4
+tagcardinality=1,2,4,2
+
+sparsity=0.0
+delayedSeries=0.0
+delayedIntervals=0
+
+timestampunits=SECONDS
+timestampinterval=60
+querytimespan=3600
+
+readproportion=0.10
+updateproportion=0.00
+insertproportion=0.90