Skip to content
Snippets Groups Projects
Commit b634f0cd authored by nitsanw's avatar nitsanw Committed by Sean Busbey
Browse files

[core] - Output intended vs. per-op latency or both

- Output old histogram or HdrHistogram or both
- Potentially output HdrHistogram to log file on status interval.
parent e267ce39
No related branches found
No related tags found
No related merge requests found
YCSB in it's original form suffers from Coordinated Omission[1]:
When used as a latency under load benchmark YCSB in it's original form suffers from
Coordinated Omission[1] and related measurement issue:
* Load is controlled by response time
* Measurement does not account for missing time
* Measurement starts at beginning of request rather than at intended beginning
To provide a minimal correction patch I propose:
1. Replace internal histogram implementation with HdrHistogram:
* Measurement is limited in scope as the histogram does not provide data on overflow values
To provide a minimal correction patch the following were implemented:
1. Replace internal histogram implementation with HdrHistogram[2]:
HdrHistogram offers a dynamic range of measurement at a given precision and will
improve the fidelity of reporting. It allows capturing a much wider range of latencies.
We could add controls for corrected/uncorrected measurement. This is appropriate for a
uniform load test, but not for other loads. The mixing of different Ops also makes
this correction unreliable. This could work for a global histogram.
HdrHistogram also supports compressed loss-less serialization which enable capturing
snapshot histograms from which lower resolution histograms can be constructed for plotting
latency over time. Snapshot interval histograms are serialized on status reporting which
must be enabled using the '-s' option.
2. Track intended operation start and report latencies from that point in time:
This will require the introduction of a new measurement point and will inevitably
include measuring some of the internal preparation steps of the load generator.
Assuming the benchmark sets a target schedule of execution in which every operation
is supposed to happen at a given time the benchmark should measure the latency between
intended start time and operation completion.
This required the introduction of a new measurement point and inevitably
includes measuring some of the internal preparation steps of the load generator.
These overhead should be negligible in the context of a network hop, but could
be corrected for by estimating the load-generator overheads (e.g. by measuring a
no-op DB).
no-op DB or by measuring the setup time for an operation and deducting that from total).
This intended measurement point is only used when there is a target load (specified by
the -target paramaeter)
This branch supports the following new options:
* -p measurementtype=[histogram|hdrhistogram|hdrhistogram+histogram|timeseries] (default=histogram)
The new measurement types are hdrhistogram and hdrhistogram+histogram. Default is still
histogram, which is the old histogram. Ultimately we would remove the old measurement types
and use only HdrHistogram but the old measurement is left in there for comparison sake.
* -p measurement.interval=[op|intended|both] (default=op)
This new option deferentiates between measured intervals and adds the intended interval(as described)
above, and the option to record both the op and intended for comparison.
* -p hdrhistogram.fileoutput=[true|false] (default=false)
This new option will enable periodical writes of the interval histogram into an output file. The path can be set using '-p hdrhistogram.output.path=<PATH>'.
Example parameters:
-target 1000 -s -p workload=com.yahoo.ycsb.workloads.CoreWorkload -p basicdb.verbose=false -p basicdb.simulatedelay=4 -p measurement.interval=both -p measurementtype=hdrhistogram -p hdrhistogram.fileoutput=true -p maxexecutiontime=60
Further correction suggestions:
1. Correction load control: currently after a pause the load generator will do
operations back to back to catchup, this leads to a flat out throughput mode
of testing as opposed to controlled load.
2. Move to async model: Scenarios where Ops have no dependency could delegate the
Op execution to a threadpool and thus separate the request rate control from the
synchronous execution of Ops. Measurement would start on queuing for execution.
1. https://groups.google.com/forum/#!msg/mechanical-sympathy/icNZJejUHfE/BfDekfBEs_sJ
\ No newline at end of file
1. https://groups.google.com/forum/#!msg/mechanical-sympathy/icNZJejUHfE/BfDekfBEs_sJ
2. https://github.com/HdrHistogram/HdrHistogram
\ No newline at end of file
......@@ -154,8 +154,6 @@ class ClientThread extends Thread
Properties _props;
long _targetOpsTickNs;
final Measurements _measurements;
final boolean _measureFromIntendedDeadline;
/**
* Constructor.
......@@ -169,7 +167,7 @@ class ClientThread extends Thread
* @param opcount the number of operations (transactions or inserts) to do
* @param targetperthreadperms target number of operations per thread per ms
*/
public ClientThread(DB db, boolean dotransactions, Workload workload, int threadid, int threadcount, Properties props, int opcount, double targetperthreadperms, boolean measureFromIntendedDeadline)
public ClientThread(DB db, boolean dotransactions, Workload workload, int threadid, int threadcount, Properties props, int opcount, double targetperthreadperms)
{
//TODO: consider removing threadcount and threadid
_db=db;
......@@ -185,8 +183,6 @@ class ClientThread extends Thread
_threadcount=threadcount;
_props=props;
_measurements = Measurements.getMeasurements();
//System.out.println("Interval = "+interval);
_measureFromIntendedDeadline = false;
}
public int getOpsDone()
......@@ -299,8 +295,7 @@ class ClientThread extends Thread
// delay until next tick
long deadline = startTimeNanos + _opsdone*_targetOpsTickNs;
sleepUntil(deadline);
if(_measureFromIntendedDeadline)
_measurements.setIntendedStartTimeNs(deadline);
_measurements.setIntendedStartTimeNs(deadline);
}
}
......@@ -368,9 +363,6 @@ public class Client
*/
public static final String MAX_EXECUTION_TIME = "maxexecutiontime";
private static final String CO_CORRECT_PROPERTY = "co.correct";
private static final String DEFAULT_CO_CORRECT = "true";
public static void usageMessage()
{
......@@ -735,7 +727,6 @@ public class Client
opcount=Integer.parseInt(props.getProperty(RECORD_COUNT_PROPERTY, DEFAULT_RECORD_COUNT));
}
}
boolean measureFromIntendedDeadline = Boolean.parseBoolean(props.getProperty(CO_CORRECT_PROPERTY, DEFAULT_CO_CORRECT));
Vector<Thread> threads=new Vector<Thread>();
for (int threadid=0; threadid<threadcount; threadid++)
......@@ -752,7 +743,7 @@ public class Client
}
Thread t=new ClientThread(db,dotransactions,workload,threadid,threadcount,props,opcount/threadcount,targetperthreadperms, measureFromIntendedDeadline);
Thread t=new ClientThread(db,dotransactions,workload,threadid,threadcount,props,opcount/threadcount, targetperthreadperms);
threads.add(t);
//t.start();
......
......@@ -69,11 +69,11 @@ public class DBWrapper extends DB
*/
public void cleanup() throws DBException
{
long ist=_measurements.getIntendedtartTimeNs();
long st = System.nanoTime();
long ist=_measurements.getIntendedtartTimeNs();
long st = System.nanoTime();
_db.cleanup();
long en=System.nanoTime();
measure(ist, st, en);
long en=System.nanoTime();
measure("CLEANUP",ist, st, en);
}
/**
......@@ -91,7 +91,7 @@ public class DBWrapper extends DB
long st = System.nanoTime();
int res=_db.read(table,key,fields,result);
long en=System.nanoTime();
measure(ist, st, en);
measure("READ",ist, st, en);
_measurements.reportReturnCode("READ",res);
return res;
}
......@@ -112,14 +112,14 @@ public class DBWrapper extends DB
long st = System.nanoTime();
int res=_db.scan(table,startkey,recordcount,fields,result);
long en=System.nanoTime();
measure(ist, st, en);
measure("SCAN",ist, st, en);
_measurements.reportReturnCode("SCAN",res);
return res;
}
private void measure(long intendedStartTimeNanos, long startTimeNanos, long endTimeNanos) {
_measurements.measure("CLEANUP", (int)((endTimeNanos-startTimeNanos)/1000));
_measurements.measureIntended("CLEANUP", (int)((endTimeNanos-intendedStartTimeNanos)/1000));
private void measure(String op, long intendedStartTimeNanos, long startTimeNanos, long endTimeNanos) {
_measurements.measure(op, (int)((endTimeNanos-startTimeNanos)/1000));
_measurements.measureIntended(op, (int)((endTimeNanos-intendedStartTimeNanos)/1000));
}
/**
......@@ -137,7 +137,7 @@ public class DBWrapper extends DB
long st = System.nanoTime();
int res=_db.update(table,key,values);
long en=System.nanoTime();
measure(ist, st, en);
measure("UPDATE",ist, st, en);
_measurements.reportReturnCode("UPDATE",res);
return res;
}
......@@ -157,7 +157,7 @@ public class DBWrapper extends DB
long st = System.nanoTime();
int res=_db.insert(table,key,values);
long en=System.nanoTime();
measure(ist, st, en);
measure("INSERT",ist, st, en);
_measurements.reportReturnCode("INSERT",res);
return res;
}
......@@ -175,7 +175,7 @@ public class DBWrapper extends DB
long st = System.nanoTime();
int res=_db.delete(table,key);
long en=System.nanoTime();
measure(ist, st, en);
measure("DELETE",ist, st, en);
_measurements.reportReturnCode("DELETE",res);
return res;
}
......
......@@ -34,6 +34,8 @@ public class Measurements
public static final String MEASUREMENT_TYPE_PROPERTY = "measurementtype";
private static final String MEASUREMENT_TYPE_PROPERTY_DEFAULT = "histogram";
private static final String MEASUREMENT_INTERVAL = "measurement.interval";
private static final String MEASUREMENT_INTERVAL_DEFAULT = "op";
static Measurements singleton=null;
......@@ -56,10 +58,10 @@ public class Measurements
return singleton;
}
final ConcurrentHashMap<String,OneMeasurement> opToMesurementMap;
final ConcurrentHashMap<String,OneMeasurement> opToIntendedMesurementMap;
final int measurementType;
final ConcurrentHashMap<String,OneMeasurement> _opToMesurementMap;
final ConcurrentHashMap<String,OneMeasurement> _opToIntendedMesurementMap;
final int _measurementType;
final int _measurementInterval;
private Properties _props;
/**
......@@ -67,33 +69,54 @@ public class Measurements
*/
public Measurements(Properties props)
{
opToMesurementMap=new ConcurrentHashMap<String,OneMeasurement>();
opToIntendedMesurementMap=new ConcurrentHashMap<String,OneMeasurement>();
_opToMesurementMap=new ConcurrentHashMap<String,OneMeasurement>();
_opToIntendedMesurementMap=new ConcurrentHashMap<String,OneMeasurement>();
_props=props;
String mTypeString = _props.getProperty(MEASUREMENT_TYPE_PROPERTY, MEASUREMENT_TYPE_PROPERTY_DEFAULT);
if (mTypeString.equals("histogram"))
{
measurementType=0;
_measurementType = 0;
}
else if (mTypeString.equals("hdrhistogram"))
{
measurementType=1;
_measurementType = 1;
}
else if (mTypeString.equals("hdrhistogram+buckethistogram"))
else if (mTypeString.equals("hdrhistogram+histogram"))
{
measurementType=2;
_measurementType = 2;
}
else
else if (mTypeString.equals("timeseries"))
{
measurementType=3;
_measurementType = 3;
}
else {
throw new IllegalArgumentException("unknown "+MEASUREMENT_TYPE_PROPERTY+"="+mTypeString);
}
String mIntervalString = _props.getProperty(MEASUREMENT_INTERVAL, MEASUREMENT_INTERVAL_DEFAULT);
if (mIntervalString.equals("op"))
{
_measurementInterval = 0;
}
else if (mIntervalString.equals("intended"))
{
_measurementInterval = 1;
}
else if (mIntervalString.equals("both"))
{
_measurementInterval = 2;
}
else {
throw new IllegalArgumentException("unknown "+MEASUREMENT_INTERVAL+"="+mIntervalString);
}
}
OneMeasurement constructOneMeasurement(String name)
{
switch (measurementType)
switch (_measurementType)
{
case 0:
return new OneMeasurementHistogram(name, _props);
......@@ -119,17 +142,21 @@ public class Measurements
}
}
}
ThreadLocal<StartTimeHolder> tls = new ThreadLocal<Measurements.StartTimeHolder>(){
ThreadLocal<StartTimeHolder> tlIntendedStartTime = new ThreadLocal<Measurements.StartTimeHolder>(){
protected StartTimeHolder initialValue() {
return new StartTimeHolder();
};
};
};
public void setIntendedStartTimeNs(long time){
tls.get().time=time;
if(_measurementInterval==0)
return;
tlIntendedStartTime.get().time=time;
}
public long getIntendedtartTimeNs(){
return tls.get().startTime();
if(_measurementInterval==0)
return 0L;
return tlIntendedStartTime.get().startTime();
}
/**
......@@ -138,6 +165,8 @@ public class Measurements
*/
public void measure(String operation, int latency)
{
if(_measurementInterval==1)
return;
try
{
OneMeasurement m = getOpMeasurement(operation);
......@@ -157,6 +186,8 @@ public class Measurements
*/
public void measureIntended(String operation, int latency)
{
if(_measurementInterval==0)
return;
try
{
OneMeasurement m = getOpIntendedMeasurement(operation);
......@@ -172,11 +203,11 @@ public class Measurements
}
private OneMeasurement getOpMeasurement(String operation) {
OneMeasurement m = opToMesurementMap.get(operation);
OneMeasurement m = _opToMesurementMap.get(operation);
if(m == null)
{
m = constructOneMeasurement(operation);
OneMeasurement oldM = opToMesurementMap.putIfAbsent(operation, m);
OneMeasurement oldM = _opToMesurementMap.putIfAbsent(operation, m);
if(oldM != null)
{
m = oldM;
......@@ -185,11 +216,12 @@ public class Measurements
return m;
}
private OneMeasurement getOpIntendedMeasurement(String operation) {
OneMeasurement m = opToIntendedMesurementMap.get(operation);
OneMeasurement m = _opToIntendedMesurementMap.get(operation);
if(m == null)
{
m = constructOneMeasurement("Intended-"+operation);
OneMeasurement oldM = opToIntendedMesurementMap.putIfAbsent(operation, m);
final String name = _measurementInterval==1 ? operation : "Intended-" + operation;
m = constructOneMeasurement(name);
OneMeasurement oldM = _opToIntendedMesurementMap.putIfAbsent(operation, m);
if(oldM != null)
{
m = oldM;
......@@ -198,11 +230,13 @@ public class Measurements
return m;
}
/**
* Report a return code for a single DB operaiton.
* Report a return code for a single DB operation.
*/
public void reportReturnCode(String operation, int code)
{
OneMeasurement m = getOpMeasurement(operation);
OneMeasurement m = _measurementInterval==1 ?
getOpIntendedMeasurement(operation) :
getOpMeasurement(operation);
m.reportReturnCode(code);
}
......@@ -214,11 +248,11 @@ public class Measurements
*/
public void exportMeasurements(MeasurementsExporter exporter) throws IOException
{
for (OneMeasurement measurement : opToMesurementMap.values())
for (OneMeasurement measurement : _opToMesurementMap.values())
{
measurement.exportMeasurements(exporter);
}
for (OneMeasurement measurement : opToIntendedMesurementMap.values())
for (OneMeasurement measurement : _opToIntendedMesurementMap.values())
{
measurement.exportMeasurements(exporter);
}
......@@ -230,11 +264,11 @@ public class Measurements
public synchronized String getSummary()
{
String ret="";
for (OneMeasurement m : opToMesurementMap.values())
for (OneMeasurement m : _opToMesurementMap.values())
{
ret+=m.getSummary()+" ";
}
for (OneMeasurement m : opToIntendedMesurementMap.values())
for (OneMeasurement m : _opToIntendedMesurementMap.values())
{
ret+=m.getSummary()+" ";
}
......
......@@ -17,7 +17,11 @@
package com.yahoo.ycsb.measurements;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.text.DecimalFormat;
import java.util.Map;
import java.util.Properties;
......@@ -25,6 +29,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.HdrHistogram.Histogram;
import org.HdrHistogram.HistogramLogWriter;
import org.HdrHistogram.Recorder;
import com.yahoo.ycsb.measurements.exporter.MeasurementsExporter;
......@@ -36,9 +41,11 @@ import com.yahoo.ycsb.measurements.exporter.MeasurementsExporter;
*
*/
public class OneMeasurementHdrHistogram extends OneMeasurement {
Recorder histogram = new Recorder(3);
// we need one log per measurement histogram
final PrintStream log;
final HistogramLogWriter histogramLogWriter;
final Recorder histogram = new Recorder(3);
final ConcurrentHashMap<Integer, AtomicInteger> returncodes;
Histogram totalHistogram;
......@@ -46,6 +53,23 @@ public class OneMeasurementHdrHistogram extends OneMeasurement {
public OneMeasurementHdrHistogram(String name, Properties props) {
super(name);
returncodes = new ConcurrentHashMap<Integer, AtomicInteger>();
boolean shouldLog = Boolean.parseBoolean(props.getProperty("hdrhistogram.fileoutput", "false"));
if (!shouldLog) {
log = null;
histogramLogWriter = null;
return;
}
try {
final String hdrOutputFilename = props.getProperty("hdrhistogram.output.path", "") +name+".hdr";
log = new PrintStream(new FileOutputStream(hdrOutputFilename), false);
} catch (FileNotFoundException e) {
throw new RuntimeException("Failed to open hdr histogram output file",e);
}
histogramLogWriter = new HistogramLogWriter(log);
histogramLogWriter.outputComment("[Logging for: " + name + "]");
histogramLogWriter.outputLogFormatVersion();
histogramLogWriter.outputStartTime(System.currentTimeMillis());
histogramLogWriter.outputLegend();
}
/**
......@@ -84,7 +108,12 @@ public class OneMeasurementHdrHistogram extends OneMeasurement {
@Override
public void exportMeasurements(MeasurementsExporter exporter) throws IOException {
// accumulate the last interval which was not caught by status thread
getIntervalHistogramAndAccumulate();
Histogram intervalHistogram = getIntervalHistogramAndAccumulate();
if(histogramLogWriter != null) {
histogramLogWriter.outputIntervalHistogram(intervalHistogram);
// we can close now
log.close();
}
exporter.write(getName(), "Operations", totalHistogram.getTotalCount());
exporter.write(getName(), "AverageLatency(us)", totalHistogram.getMean());
exporter.write(getName(), "MinLatency(us)", totalHistogram.getMinValue());
......@@ -95,7 +124,6 @@ public class OneMeasurementHdrHistogram extends OneMeasurement {
for (Map.Entry<Integer, AtomicInteger> entry : returncodes.entrySet()) {
exporter.write(getName(), "Return=" + entry.getKey(), entry.getValue().get());
}
}
/**
......@@ -106,6 +134,11 @@ public class OneMeasurementHdrHistogram extends OneMeasurement {
@Override
public String getSummary() {
Histogram intervalHistogram = getIntervalHistogramAndAccumulate();
// we use the summary interval as the histogram file interval.
if(histogramLogWriter != null) {
histogramLogWriter.outputIntervalHistogram(intervalHistogram);
}
DecimalFormat d = new DecimalFormat("#.##");
return "[" + getName() +
": Count=" + intervalHistogram.getTotalCount() +
......
......@@ -353,6 +353,8 @@ public class CoreWorkload extends Workload
double scanproportion=Double.parseDouble(p.getProperty(SCAN_PROPORTION_PROPERTY,SCAN_PROPORTION_PROPERTY_DEFAULT));
double readmodifywriteproportion=Double.parseDouble(p.getProperty(READMODIFYWRITE_PROPORTION_PROPERTY,READMODIFYWRITE_PROPORTION_PROPERTY_DEFAULT));
recordcount=Integer.parseInt(p.getProperty(Client.RECORD_COUNT_PROPERTY, Client.DEFAULT_RECORD_COUNT));
if(recordcount == 0)
recordcount = Integer.MAX_VALUE;
String requestdistrib=p.getProperty(REQUEST_DISTRIBUTION_PROPERTY,REQUEST_DISTRIBUTION_PROPERTY_DEFAULT);
int maxscanlength=Integer.parseInt(p.getProperty(MAX_SCAN_LENGTH_PROPERTY,MAX_SCAN_LENGTH_PROPERTY_DEFAULT));
String scanlengthdistrib=p.getProperty(SCAN_LENGTH_DISTRIBUTION_PROPERTY,SCAN_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment