Skip to content
Snippets Groups Projects
Commit ccff6fc1 authored by nitsanw's avatar nitsanw Committed by Sean Busbey
Browse files

[core] Add intended deadline reporting and measurement.

Add option for controlling the feature(default on).
parent 6f49fd96
No related branches found
No related tags found
No related merge requests found
......@@ -152,7 +152,9 @@ class ClientThread extends Thread
int _threadcount;
Object _workloadstate;
Properties _props;
private long _targetOpsTickNs;
long _targetOpsTickNs;
final Measurements _measurements;
final boolean _measureFromIntendedDeadline;
/**
......@@ -167,7 +169,7 @@ class ClientThread extends Thread
* @param opcount the number of operations (transactions or inserts) to do
* @param targetperthreadperms target number of operations per thread per ms
*/
public ClientThread(DB db, boolean dotransactions, Workload workload, int threadid, int threadcount, Properties props, int opcount, double targetperthreadperms)
public ClientThread(DB db, boolean dotransactions, Workload workload, int threadid, int threadcount, Properties props, int opcount, double targetperthreadperms, boolean measureFromIntendedDeadline)
{
//TODO: consider removing threadcount and threadid
_db=db;
......@@ -182,7 +184,9 @@ class ClientThread extends Thread
_threadid=threadid;
_threadcount=threadcount;
_props=props;
_measurements = Measurements.getMeasurements();
//System.out.println("Interval = "+interval);
_measureFromIntendedDeadline = false;
}
public int getOpsDone()
......@@ -271,6 +275,7 @@ class ClientThread extends Thread
try
{
_measurements.setStartTimeNs(0);
_db.cleanup();
}
catch (DBException e)
......@@ -287,17 +292,18 @@ class ClientThread extends Thread
LockSupport.parkNanos(deadline - now);
}
}
private long throttleNanos(long startTimeNanos) {
private void throttleNanos(long startTimeNanos) {
//throttle the operations
if (_targetOpsPerMs > 0)
{
// delay until next tick
long deadline = startTimeNanos + _opsdone*_targetOpsTickNs;
sleepUntil(deadline);
return deadline;
if(_measureFromIntendedDeadline)
_measurements.setStartTimeNs(deadline);
}
return -1;
}
}
/**
......@@ -362,6 +368,10 @@ public class Client
*/
public static final String MAX_EXECUTION_TIME = "maxexecutiontime";
private static final String CO_CORRECT_PROPERTY = "co.correct";
private static final String DEFAULT_CO_CORRECT = "true";
public static void usageMessage()
{
System.out.println("Usage: java com.yahoo.ycsb.Client [options]");
......@@ -725,7 +735,7 @@ public class Client
opcount=Integer.parseInt(props.getProperty(RECORD_COUNT_PROPERTY, DEFAULT_RECORD_COUNT));
}
}
boolean measureFromIntendedDeadline = Boolean.parseBoolean(props.getProperty(CO_CORRECT_PROPERTY, DEFAULT_CO_CORRECT));
Vector<Thread> threads=new Vector<Thread>();
for (int threadid=0; threadid<threadcount; threadid++)
......@@ -741,7 +751,8 @@ public class Client
System.exit(0);
}
Thread t=new ClientThread(db,dotransactions,workload,threadid,threadcount,props,opcount/threadcount,targetperthreadperms);
Thread t=new ClientThread(db,dotransactions,workload,threadid,threadcount,props,opcount/threadcount,targetperthreadperms, measureFromIntendedDeadline);
threads.add(t);
//t.start();
......
......@@ -69,7 +69,7 @@ public class DBWrapper extends DB
*/
public void cleanup() throws DBException
{
long st=System.nanoTime();
long st=_measurements.startTimeNs();
_db.cleanup();
long en=System.nanoTime();
_measurements.measure("CLEANUP", (int)((en-st)/1000));
......@@ -86,7 +86,7 @@ public class DBWrapper extends DB
*/
public int read(String table, String key, Set<String> fields, HashMap<String,ByteIterator> result)
{
long st=System.nanoTime();
long st=_measurements.startTimeNs();
int res=_db.read(table,key,fields,result);
long en=System.nanoTime();
_measurements.measure("READ",(int)((en-st)/1000));
......@@ -106,7 +106,7 @@ public class DBWrapper extends DB
*/
public int scan(String table, String startkey, int recordcount, Set<String> fields, Vector<HashMap<String,ByteIterator>> result)
{
long st=System.nanoTime();
long st=_measurements.startTimeNs();
int res=_db.scan(table,startkey,recordcount,fields,result);
long en=System.nanoTime();
_measurements.measure("SCAN",(int)((en-st)/1000));
......@@ -125,7 +125,7 @@ public class DBWrapper extends DB
*/
public int update(String table, String key, HashMap<String,ByteIterator> values)
{
long st=System.nanoTime();
long st=_measurements.startTimeNs();
int res=_db.update(table,key,values);
long en=System.nanoTime();
_measurements.measure("UPDATE",(int)((en-st)/1000));
......@@ -144,7 +144,7 @@ public class DBWrapper extends DB
*/
public int insert(String table, String key, HashMap<String,ByteIterator> values)
{
long st=System.nanoTime();
long st=_measurements.startTimeNs();
int res=_db.insert(table,key,values);
long en=System.nanoTime();
_measurements.measure("INSERT",(int)((en-st)/1000));
......@@ -161,7 +161,7 @@ public class DBWrapper extends DB
*/
public int delete(String table, String key)
{
long st=System.nanoTime();
long st=_measurements.startTimeNs();
int res=_db.delete(table,key);
long en=System.nanoTime();
_measurements.measure("DELETE",(int)((en-st)/1000));
......
......@@ -100,6 +100,28 @@ public class Measurements
}
}
static class StartTimeHolder{
long time;
long startTime(){
if(time == 0) {
return System.nanoTime();
}
else {
return time;
}
}
}
ThreadLocal<StartTimeHolder> tls = new ThreadLocal<Measurements.StartTimeHolder>(){
protected StartTimeHolder initialValue() {
return new StartTimeHolder();
};
};
public void setStartTimeNs(long time){
tls.get().time=time;
}
public long startTimeNs(){
return tls.get().startTime();
}
/**
* Report a single value of a single metric. E.g. for read latency, operation="READ" and latency is the measured value.
*/
......
......@@ -305,6 +305,8 @@ public class CoreWorkload extends Workload
boolean orderedinserts;
int recordcount;
private Measurements _measurements = Measurements.getMeasurements();
protected static IntegerGenerator getFieldLengthGenerator(Properties p) throws WorkloadException{
IntegerGenerator fieldlengthgenerator;
......@@ -685,7 +687,7 @@ public class CoreWorkload extends Workload
HashMap<String,ByteIterator> cells =
new HashMap<String,ByteIterator>();
long st=System.nanoTime();
long st=_measurements.startTimeNs();
db.read(table,keyname,fields,cells);
......@@ -697,7 +699,7 @@ public class CoreWorkload extends Workload
verifyRow(keyname, cells);
}
Measurements.getMeasurements().measure("READ-MODIFY-WRITE", (int)((en-st)/1000));
_measurements .measure("READ-MODIFY-WRITE", (int)((en-st)/1000));
}
public void doTransactionScan(DB db)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment