diff --git a/core/CHANGES.md b/core/CHANGES.md
new file mode 100644
index 0000000000000000000000000000000000000000..d6a53d4821e69ec0f33b19cc9efda75cb41663e3
--- /dev/null
+++ b/core/CHANGES.md
@@ -0,0 +1,67 @@
+When used as a latency under load benchmark YCSB in it's original form suffers from
+Coordinated Omission[1] and related measurement issue:
+
+* Load is controlled by response time
+* Measurement does not account for missing time
+* Measurement starts at beginning of request rather than at intended beginning
+* Measurement is limited in scope as the histogram does not provide data on overflow values
+
+To provide a minimal correction patch the following were implemented:
+
+1. Replace internal histogram implementation with HdrHistogram[2]:
+HdrHistogram offers a dynamic range of measurement at a given precision and will
+improve the fidelity of reporting. It allows capturing a much wider range of latencies.
+HdrHistogram also supports compressed loss-less serialization which enable capturing
+snapshot histograms from which lower resolution histograms can be constructed for plotting
+latency over time. Snapshot interval histograms are serialized on status reporting which
+must be enabled using the '-s' option.
+ 
+2. Track intended operation start and report latencies from that point in time:
+Assuming the benchmark sets a target schedule of execution in which every operation
+is supposed to happen at a given time the benchmark should measure the latency between
+intended start time and operation completion.
+This required the introduction of a new measurement point and inevitably
+includes measuring some of the internal preparation steps of the load generator.
+These overhead should be negligible in the context of a network hop, but could
+be corrected for by estimating the load-generator overheads (e.g. by measuring a
+no-op DB or by measuring the setup time for an operation and deducting that from total).
+This intended measurement point is only used when there is a target load (specified by
+the -target paramaeter)
+
+This branch supports the following new options:
+
+* -p measurementtype=[histogram|hdrhistogram|hdrhistogram+histogram|timeseries] (default=histogram)
+The new measurement types are hdrhistogram and hdrhistogram+histogram. Default is still
+histogram, which is the old histogram. Ultimately we would remove the old measurement types
+and use only HdrHistogram but the old measurement is left in there for comparison sake.
+
+* -p measurement.interval=[op|intended|both] (default=op)
+This new option deferentiates between measured intervals and adds the intended interval(as described)
+above, and the option to record both the op and intended for comparison.
+
+* -p hdrhistogram.fileoutput=[true|false] (default=false)
+This new option will enable periodical writes of the interval histogram into an output file. The path can be set using '-p hdrhistogram.output.path=<PATH>'.
+
+Example parameters:
+-target 1000 -s -p workload=com.yahoo.ycsb.workloads.CoreWorkload -p basicdb.verbose=false -p basicdb.simulatedelay=4 -p measurement.interval=both -p measurementtype=hdrhistogram -p hdrhistogram.fileoutput=true -p maxexecutiontime=60
+
+Further changes made:
+
+* -p status.interval=<number of seconds> (default=10)
+Controls the number of seconds between status reports and therefore between HdrHistogram snapshots reported.
+
+* -p basicdb.randomizedelay=[true|false] (default=true)
+Controls weather the delay simulated by the mock DB is uniformly random or not.
+
+Further suggestions:
+
+1. Correction load control: currently after a pause the load generator will do
+operations back to back to catchup, this leads to a flat out throughput mode
+of testing as opposed to controlled load.
+
+2. Move to async model: Scenarios where Ops have no dependency could delegate the
+Op execution to a threadpool and thus separate the request rate control from the
+synchronous execution of Ops. Measurement would start on queuing for execution.
+
+1. https://groups.google.com/forum/#!msg/mechanical-sympathy/icNZJejUHfE/BfDekfBEs_sJ
+2. https://github.com/HdrHistogram/HdrHistogram
\ No newline at end of file
diff --git a/core/pom.xml b/core/pom.xml
index c5ae61ecc7d992666b8e5526e30c5f84d15e5dc0..62acc96b72636d9464f21b484fc1de521e71c802 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -32,6 +32,33 @@
       <version>6.1.1</version>
       <scope>test</scope>
     </dependency>
-  </dependencies>	
-
+    <dependency>
+      <groupId>org.hdrhistogram</groupId>
+      <artifactId>HdrHistogram</artifactId>
+      <version>2.1.4</version>
+    </dependency>
+  </dependencies>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <version>${maven.assembly.version}</version>
+        <configuration>
+          <descriptorRefs>
+            <descriptorRef>jar-with-dependencies</descriptorRef>
+          </descriptorRefs>
+          <appendAssemblyId>false</appendAssemblyId>
+        </configuration>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
 </project>
diff --git a/core/src/main/java/com/yahoo/ycsb/BasicDB.java b/core/src/main/java/com/yahoo/ycsb/BasicDB.java
index 9490451eff7e3be3d22ad30cbb1392d0712d8d6d..0eff6f3808f0197439a581c36b05a384846e367e 100644
--- a/core/src/main/java/com/yahoo/ycsb/BasicDB.java
+++ b/core/src/main/java/com/yahoo/ycsb/BasicDB.java
@@ -22,6 +22,8 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.Enumeration;
 import java.util.Vector;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
 
 
 /**
@@ -32,11 +34,15 @@ public class BasicDB extends DB
 	public static final String VERBOSE="basicdb.verbose";
 	public static final String VERBOSE_DEFAULT="true";
 	
-	public static final String SIMULATE_DELAY="basicdb.simulatedelay";
-	public static final String SIMULATE_DELAY_DEFAULT="0";
+    public static final String SIMULATE_DELAY="basicdb.simulatedelay";
+    public static final String SIMULATE_DELAY_DEFAULT="0";
+    
+    public static final String RANDOMIZE_DELAY="basicdb.randomizedelay";
+    public static final String RANDOMIZE_DELAY_DEFAULT="true";
+    
 	
-	
-	boolean verbose;
+    boolean verbose;
+    boolean randomizedelay;
 	int todelay;
 
 	public BasicDB()
@@ -49,14 +55,22 @@ public class BasicDB extends DB
 	{
 		if (todelay>0)
 		{
-			try
-			{
-				Thread.sleep((long)Utils.random().nextInt(todelay));
-			}
-			catch (InterruptedException e)
-			{
-				//do nothing
-			}
+		    long delayNs;
+		    if (randomizedelay) {
+		        delayNs = TimeUnit.MILLISECONDS.toNanos(Utils.random().nextInt(todelay));
+		        if (delayNs == 0) {
+		            return;
+		        }
+		    }
+		    else {
+		        delayNs = TimeUnit.MILLISECONDS.toNanos(todelay);
+		    }
+		    
+		    long now = System.nanoTime();
+            final long deadline = now + delayNs;
+            do {
+                LockSupport.parkNanos(deadline - now);
+            } while ((now = System.nanoTime()) < deadline && !Thread.interrupted());
 		}
 	}
 
@@ -69,7 +83,7 @@ public class BasicDB extends DB
 	{
 		verbose=Boolean.parseBoolean(getProperties().getProperty(VERBOSE, VERBOSE_DEFAULT));
 		todelay=Integer.parseInt(getProperties().getProperty(SIMULATE_DELAY, SIMULATE_DELAY_DEFAULT));
-		
+		randomizedelay=Boolean.parseBoolean(getProperties().getProperty(RANDOMIZE_DELAY, RANDOMIZE_DELAY_DEFAULT));
 		if (verbose)
 		{
 			System.out.println("***************** properties *****************");
diff --git a/core/src/main/java/com/yahoo/ycsb/Client.java b/core/src/main/java/com/yahoo/ycsb/Client.java
index 516cb682464263610dd9c812ac7437caba611c64..5b2c827e068c32a8ddda38173cb31fb6c871ff21 100644
--- a/core/src/main/java/com/yahoo/ycsb/Client.java
+++ b/core/src/main/java/com/yahoo/ycsb/Client.java
@@ -18,10 +18,6 @@
 package com.yahoo.ycsb;
 
 
-import com.yahoo.ycsb.measurements.Measurements;
-import com.yahoo.ycsb.measurements.exporter.MeasurementsExporter;
-import com.yahoo.ycsb.measurements.exporter.TextMeasurementsExporter;
-
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -32,6 +28,12 @@ import java.util.Date;
 import java.util.Enumeration;
 import java.util.Properties;
 import java.util.Vector;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
+
+import com.yahoo.ycsb.measurements.Measurements;
+import com.yahoo.ycsb.measurements.exporter.MeasurementsExporter;
+import com.yahoo.ycsb.measurements.exporter.TextMeasurementsExporter;
 
 //import org.apache.log4j.BasicConfigurator;
 
@@ -50,13 +52,14 @@ class StatusThread extends Thread
 	/**
 	 * The interval for reporting status.
 	 */
-	public static final long sleeptime=10000;
+	long _sleeptimeNs;
 
-	public StatusThread(Vector<Thread> threads, String label, boolean standardstatus)
+	public StatusThread(Vector<Thread> threads, String label, boolean standardstatus, int statusIntervalSeconds)
 	{
 		_threads=threads;
 		_label=label;
 		_standardstatus=standardstatus;
+		_sleeptimeNs=TimeUnit.SECONDS.toNanos(statusIntervalSeconds);
 	}
 
 	/**
@@ -64,8 +67,9 @@ class StatusThread extends Thread
 	 */
 	public void run()
 	{
-		long st=System.currentTimeMillis();
-
+		final long st=System.currentTimeMillis();
+		final long startTimeNanos = System.nanoTime();
+		long deadline = startTimeNanos + _sleeptimeNs;
 		long lasten=st;
 		long lasttotalops=0;
 		
@@ -118,15 +122,8 @@ class StatusThread extends Thread
 				System.out.println(msg);
 			}
 
-			try
-			{
-				sleep(sleeptime);
-			}
-			catch (InterruptedException e)
-			{
-				//do nothing
-			}
-
+			ClientThread.sleepUntil(deadline);
+			deadline+=_sleeptimeNs;
 		}
 		while (!alldone);
 	}
@@ -140,18 +137,20 @@ class StatusThread extends Thread
  */
 class ClientThread extends Thread
 {
-	DB _db;
+	private static boolean _spinSleep;
+    DB _db;
 	boolean _dotransactions;
 	Workload _workload;
 	int _opcount;
-	double _target;
+	double _targetOpsPerMs;
 
 	int _opsdone;
 	int _threadid;
 	int _threadcount;
 	Object _workloadstate;
 	Properties _props;
-
+    long _targetOpsTickNs;
+    final Measurements _measurements;
 
 	/**
 	 * Constructor.
@@ -173,11 +172,15 @@ class ClientThread extends Thread
 		_workload=workload;
 		_opcount=opcount;
 		_opsdone=0;
-		_target=targetperthreadperms;
+		if(targetperthreadperms > 0){
+		_targetOpsPerMs=targetperthreadperms;
+		_targetOpsTickNs=(long)(1000000/_targetOpsPerMs);
+		}
 		_threadid=threadid;
 		_threadcount=threadcount;
 		_props=props;
-		//System.out.println("Interval = "+interval);
+		_measurements = Measurements.getMeasurements();
+		_spinSleep = Boolean.valueOf(_props.getProperty("spin.sleep", "false"));
 	}
 
 	public int getOpsDone()
@@ -209,26 +212,22 @@ class ClientThread extends Thread
 			return;
 		}
 
-		//spread the thread operations out so they don't all hit the DB at the same time
-		try
-		{
-		   //GH issue 4 - throws exception if _target>1 because random.nextInt argument must be >0
-		   //and the sleep() doesn't make sense for granularities < 1 ms anyway
-		   if ( (_target>0) && (_target<=1.0) ) 
-		   {
-		      sleep(Utils.random().nextInt((int)(1.0/_target)));
-		   }
-		}
-		catch (InterruptedException e)
-		{
-		  // do nothing.
-		}
+		//NOTE: Switching to using nanoTime and parkNanos for time management here such that the measurements
+		// and the client thread have the same view on time.
 		
+		//spread the thread operations out so they don't all hit the DB at the same time
+		// GH issue 4 - throws exception if _target>1 because random.nextInt argument must be >0
+        // and the sleep() doesn't make sense for granularities < 1 ms anyway
+        if ((_targetOpsPerMs > 0) && (_targetOpsPerMs <= 1.0))
+        {
+            long randomMinorDelay = Utils.random().nextInt((int) _targetOpsTickNs);
+            sleepUntil(System.nanoTime() + randomMinorDelay);
+        }
 		try
 		{
 			if (_dotransactions)
 			{
-				long st=System.currentTimeMillis();
+			    long startTimeNanos = System.nanoTime();
 
 				while (((_opcount == 0) || (_opsdone < _opcount)) && !_workload.isStopRequested())
 				{
@@ -240,32 +239,13 @@ class ClientThread extends Thread
 
 					_opsdone++;
 
-					//throttle the operations
-					if (_target>0)
-					{
-						//this is more accurate than other throttling approaches we have tried,
-						//like sleeping for (1/target throughput)-operation latency,
-						//because it smooths timing inaccuracies (from sleep() taking an int, 
-						//current time in millis) over many operations
-						while (System.currentTimeMillis()-st<((double)_opsdone)/_target)
-						{
-							try
-							{
-								sleep(1);
-							}
-							catch (InterruptedException e)
-							{
-							  // do nothing.
-							}
-
-						}
-					}
+					throttleNanos(startTimeNanos);
 				}
 			}
 			else
 			{
-				long st=System.currentTimeMillis();
-
+			    long startTimeNanos = System.nanoTime();
+		        
 				while (((_opcount == 0) || (_opsdone < _opcount)) && !_workload.isStopRequested())
 				{
 
@@ -276,25 +256,7 @@ class ClientThread extends Thread
 
 					_opsdone++;
 
-					//throttle the operations
-					if (_target>0)
-					{
-						//this is more accurate than other throttling approaches we have tried,
-						//like sleeping for (1/target throughput)-operation latency,
-						//because it smooths timing inaccuracies (from sleep() taking an int, 
-						//current time in millis) over many operations
-						while (System.currentTimeMillis()-st<((double)_opsdone)/_target)
-						{
-							try 
-							{
-								sleep(1);
-							}
-							catch (InterruptedException e)
-							{
-							  // do nothing.
-							}
-						}
-					}
+					throttleNanos(startTimeNanos);
 				}
 			}
 		}
@@ -307,6 +269,7 @@ class ClientThread extends Thread
 
 		try
 		{
+		    _measurements.setIntendedStartTimeNs(0);
 			_db.cleanup();
 		}
 		catch (DBException e)
@@ -316,6 +279,26 @@ class ClientThread extends Thread
 			return;
 		}
 	}
+
+    static void sleepUntil(long deadline) {
+        long now = System.nanoTime();
+        while((now = System.nanoTime()) < deadline) {
+            if (!_spinSleep) {
+                LockSupport.parkNanos(deadline - now);
+            }
+        }
+    }
+    private void throttleNanos(long startTimeNanos) {
+        //throttle the operations
+        if (_targetOpsPerMs > 0)
+        {
+            // delay until next tick
+            long deadline = startTimeNanos + _opsdone*_targetOpsTickNs;
+            sleepUntil(deadline);
+            _measurements.setIntendedStartTimeNs(deadline);
+        }
+    }
+    
 }
 
 /**
@@ -324,6 +307,8 @@ class ClientThread extends Thread
 public class Client
 {
 
+	public static final String DEFAULT_RECORD_COUNT = "0";
+
     /**
      * The target number of operations to perform.
      */
@@ -378,6 +363,7 @@ public class Client
    */
   public static final String MAX_EXECUTION_TIME = "maxexecutiontime";
 
+
 	public static void usageMessage()
 	{
 		System.out.println("Usage: java com.yahoo.ycsb.Client [options]");
@@ -738,10 +724,9 @@ public class Client
 			}
 			else
 			{
-				opcount=Integer.parseInt(props.getProperty(RECORD_COUNT_PROPERTY,"0"));
+				opcount=Integer.parseInt(props.getProperty(RECORD_COUNT_PROPERTY, DEFAULT_RECORD_COUNT));
 			}
 		}
-
 		Vector<Thread> threads=new Vector<Thread>();
 
 		for (int threadid=0; threadid<threadcount; threadid++)
@@ -757,7 +742,8 @@ public class Client
 				System.exit(0);
 			}
 
-			Thread t=new ClientThread(db,dotransactions,workload,threadid,threadcount,props,opcount/threadcount,targetperthreadperms);
+			
+            Thread t=new ClientThread(db,dotransactions,workload,threadid,threadcount,props,opcount/threadcount, targetperthreadperms);
 
 			threads.add(t);
 			//t.start();
@@ -768,11 +754,12 @@ public class Client
 		if (status)
 		{
 			boolean standardstatus=false;
-			if (props.getProperty(Measurements.MEASUREMENT_TYPE_PROPERTY,"").compareTo("timeseries")==0)
+			if (props.getProperty(Measurements.MEASUREMENT_TYPE_PROPERTY,"").compareTo("timeseries")==0) 
 			{
 				standardstatus=true;
-			}	
-			statusthread=new StatusThread(threads,label,standardstatus);
+			}
+			int statusIntervalSeconds = Integer.parseInt(props.getProperty("status.interval","10"));
+			statusthread=new StatusThread(threads,label,standardstatus,statusIntervalSeconds);
 			statusthread.start();
 		}
 
@@ -812,7 +799,13 @@ public class Client
 
 		if (status)
 		{
-			statusthread.interrupt();
+		    // wake up status thread if it's asleep
+		    statusthread.interrupt();
+		    // at this point we assume all the monitored threads are already gone as per above join loop.
+			try {
+                statusthread.join();
+            } catch (InterruptedException e) {
+            }
 		}
 
 		try
diff --git a/core/src/main/java/com/yahoo/ycsb/DBWrapper.java b/core/src/main/java/com/yahoo/ycsb/DBWrapper.java
index 57a9648648d739cb2325756ed837cb2702f0fc18..50b314bb75196708eca4b030e9d66d78e9954ddc 100644
--- a/core/src/main/java/com/yahoo/ycsb/DBWrapper.java
+++ b/core/src/main/java/com/yahoo/ycsb/DBWrapper.java
@@ -69,10 +69,11 @@ public class DBWrapper extends DB
 	 */
 	public void cleanup() throws DBException
 	{
-    long st=System.nanoTime();
+        long ist=_measurements.getIntendedtartTimeNs();
+        long st = System.nanoTime();
 		_db.cleanup();
-    long en=System.nanoTime();
-    _measurements.measure("CLEANUP", (int)((en-st)/1000));
+        long en=System.nanoTime();
+        measure("CLEANUP",ist, st, en);
 	}
 
 	/**
@@ -86,11 +87,12 @@ public class DBWrapper extends DB
 	 */
 	public int read(String table, String key, Set<String> fields, HashMap<String,ByteIterator> result)
 	{
-		long st=System.nanoTime();
-		int res=_db.read(table,key,fields,result);
+	    long ist=_measurements.getIntendedtartTimeNs();
+	    long st = System.nanoTime();
+	    int res=_db.read(table,key,fields,result);
 		long en=System.nanoTime();
-		_measurements.measure("READ",(int)((en-st)/1000));
-		_measurements.reportReturnCode("READ",res);
+		measure("READ",ist, st, en);
+	    _measurements.reportReturnCode("READ",res);
 		return res;
 	}
 
@@ -106,13 +108,19 @@ public class DBWrapper extends DB
 	 */
 	public int scan(String table, String startkey, int recordcount, Set<String> fields, Vector<HashMap<String,ByteIterator>> result)
 	{
-		long st=System.nanoTime();
-		int res=_db.scan(table,startkey,recordcount,fields,result);
+	    long ist=_measurements.getIntendedtartTimeNs();
+	    long st = System.nanoTime();
+	    int res=_db.scan(table,startkey,recordcount,fields,result);
 		long en=System.nanoTime();
-		_measurements.measure("SCAN",(int)((en-st)/1000));
-		_measurements.reportReturnCode("SCAN",res);
+		measure("SCAN",ist, st, en);
+	    _measurements.reportReturnCode("SCAN",res);
 		return res;
 	}
+
+    private void measure(String op, long intendedStartTimeNanos, long startTimeNanos, long endTimeNanos) {
+        _measurements.measure(op, (int)((endTimeNanos-startTimeNanos)/1000));
+	    _measurements.measureIntended(op, (int)((endTimeNanos-intendedStartTimeNanos)/1000));
+    }
 	
 	/**
 	 * Update a record in the database. Any field/value pairs in the specified values HashMap will be written into the record with the specified
@@ -125,10 +133,11 @@ public class DBWrapper extends DB
 	 */
 	public int update(String table, String key, HashMap<String,ByteIterator> values)
 	{
-		long st=System.nanoTime();
+	    long ist=_measurements.getIntendedtartTimeNs();
+	    long st = System.nanoTime();
 		int res=_db.update(table,key,values);
 		long en=System.nanoTime();
-		_measurements.measure("UPDATE",(int)((en-st)/1000));
+		measure("UPDATE",ist, st, en);
 		_measurements.reportReturnCode("UPDATE",res);
 		return res;
 	}
@@ -144,10 +153,11 @@ public class DBWrapper extends DB
 	 */
 	public int insert(String table, String key, HashMap<String,ByteIterator> values)
 	{
-		long st=System.nanoTime();
+	    long ist=_measurements.getIntendedtartTimeNs();
+	    long st = System.nanoTime();
 		int res=_db.insert(table,key,values);
 		long en=System.nanoTime();
-		_measurements.measure("INSERT",(int)((en-st)/1000));
+		measure("INSERT",ist, st, en);
 		_measurements.reportReturnCode("INSERT",res);
 		return res;
 	}
@@ -161,10 +171,11 @@ public class DBWrapper extends DB
 	 */
 	public int delete(String table, String key)
 	{
-		long st=System.nanoTime();
+	    long ist=_measurements.getIntendedtartTimeNs();
+	    long st = System.nanoTime();
 		int res=_db.delete(table,key);
 		long en=System.nanoTime();
-		_measurements.measure("DELETE",(int)((en-st)/1000));
+		measure("DELETE",ist, st, en);
 		_measurements.reportReturnCode("DELETE",res);
 		return res;
 	}
diff --git a/core/src/main/java/com/yahoo/ycsb/GoodBadUglyDB.java b/core/src/main/java/com/yahoo/ycsb/GoodBadUglyDB.java
new file mode 100644
index 0000000000000000000000000000000000000000..706438eb879bf93515dd31cefa6252696481c409
--- /dev/null
+++ b/core/src/main/java/com/yahoo/ycsb/GoodBadUglyDB.java
@@ -0,0 +1,162 @@
+/**                                                                                                                                                                                
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.                                                                                                                             
+ *                                                                                                                                                                                 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you                                                                                                             
+ * may not use this file except in compliance with the License. You                                                                                                                
+ * may obtain a copy of the License at                                                                                                                                             
+ *                                                                                                                                                                                 
+ * http://www.apache.org/licenses/LICENSE-2.0                                                                                                                                      
+ *                                                                                                                                                                                 
+ * Unless required by applicable law or agreed to in writing, software                                                                                                             
+ * distributed under the License is distributed on an "AS IS" BASIS,                                                                                                               
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or                                                                                                                 
+ * implied. See the License for the specific language governing                                                                                                                    
+ * permissions and limitations under the License. See accompanying                                                                                                                 
+ * LICENSE file.                                                                                                                                                                   
+ */
+
+package com.yahoo.ycsb;
+
+import static java.util.concurrent.TimeUnit.MICROSECONDS;
+
+import java.util.HashMap;
+import java.util.Random;
+import java.util.Set;
+import java.util.Vector;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.LockSupport;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Basic DB that just prints out the requested operations, instead of doing them against a database.
+ */
+public class GoodBadUglyDB extends DB {
+    public static final String SIMULATE_DELAY = "gbudb.delays";
+    public static final String SIMULATE_DELAY_DEFAULT = "200,1000,10000,50000,100000";
+    long delays[];
+    static ReadWriteLock DB_ACCESS = new ReentrantReadWriteLock();
+
+    public GoodBadUglyDB() {
+        delays = new long[] { 200, 1000, 10000, 50000, 200000 };
+    }
+
+    void delay() {
+        final Random random = Utils.random();
+        double p = random.nextDouble();
+        int mod;
+        if (p < 0.9) {
+            mod = 0;
+        } else if (p < 0.99) {
+            mod = 1;
+        } else if (p < 0.9999) {
+            mod = 2;
+        } else {
+            mod = 3;
+        }
+        // this will make mod 3 pauses global
+        Lock lock = mod == 3 ? DB_ACCESS.writeLock() : DB_ACCESS.readLock();
+        if (mod == 3) {
+            System.out.println("OUCH");
+        }
+        lock.lock();
+        try {
+            final long baseDelayNs = MICROSECONDS.toNanos(delays[mod]);
+            final int delayRangeNs = (int) (MICROSECONDS.toNanos(delays[mod+1]) - baseDelayNs);
+            final long delayNs = baseDelayNs + random.nextInt(delayRangeNs);
+            long now = System.nanoTime();
+            final long deadline = now + delayNs;
+            do {
+                LockSupport.parkNanos(deadline - now);
+            } while ((now = System.nanoTime()) < deadline && !Thread.interrupted());
+        }
+        finally {
+            lock.unlock();
+        }
+        
+    }
+
+    /**
+     * Initialize any state for this DB. Called once per DB instance; there is one DB instance per client thread.
+     */
+    public void init() {
+        int i=0;
+        for(String delay: getProperties().getProperty(SIMULATE_DELAY, SIMULATE_DELAY_DEFAULT).split(",")){
+            delays[i++] = Long.parseLong(delay);
+        }
+    }
+
+    /**
+     * Read a record from the database. Each field/value pair from the result will be stored in a HashMap.
+     *
+     * @param table The name of the table
+     * @param key The record key of the record to read.
+     * @param fields The list of fields to read, or null for all of them
+     * @param result A HashMap of field/value pairs for the result
+     * @return Zero on success, a non-zero error code on error
+     */
+    public int read(String table, String key, Set<String> fields, HashMap<String, ByteIterator> result) {
+        delay();
+        return 0;
+    }
+
+    /**
+     * Perform a range scan for a set of records in the database. Each field/value pair from the result will be stored
+     * in a HashMap.
+     *
+     * @param table The name of the table
+     * @param startkey The record key of the first record to read.
+     * @param recordcount The number of records to read
+     * @param fields The list of fields to read, or null for all of them
+     * @param result A Vector of HashMaps, where each HashMap is a set field/value pairs for one record
+     * @return Zero on success, a non-zero error code on error
+     */
+    public int scan(String table, String startkey, int recordcount, Set<String> fields,
+            Vector<HashMap<String, ByteIterator>> result) {
+        delay();
+
+        return 0;
+    }
+
+    /**
+     * Update a record in the database. Any field/value pairs in the specified values HashMap will be written into the
+     * record with the specified record key, overwriting any existing values with the same field name.
+     *
+     * @param table The name of the table
+     * @param key The record key of the record to write.
+     * @param values A HashMap of field/value pairs to update in the record
+     * @return Zero on success, a non-zero error code on error
+     */
+    public int update(String table, String key, HashMap<String, ByteIterator> values) {
+        delay();
+
+        return 0;
+    }
+
+    /**
+     * Insert a record in the database. Any field/value pairs in the specified values HashMap will be written into the
+     * record with the specified record key.
+     *
+     * @param table The name of the table
+     * @param key The record key of the record to insert.
+     * @param values A HashMap of field/value pairs to insert in the record
+     * @return Zero on success, a non-zero error code on error
+     */
+    public int insert(String table, String key, HashMap<String, ByteIterator> values) {
+        delay();
+        return 0;
+    }
+
+    /**
+     * Delete a record from the database.
+     *
+     * @param table The name of the table
+     * @param key The record key of the record to delete.
+     * @return Zero on success, a non-zero error code on error
+     */
+    public int delete(String table, String key) {
+        delay();
+        return 0;
+    }
+}
diff --git a/core/src/main/java/com/yahoo/ycsb/measurements/Measurements.java b/core/src/main/java/com/yahoo/ycsb/measurements/Measurements.java
index c082db4dddc6cd3efc09f7c3cd0f60987305e4e3..76ba40c16a5007a48b16c579ea10567c59cc2373 100644
--- a/core/src/main/java/com/yahoo/ycsb/measurements/Measurements.java
+++ b/core/src/main/java/com/yahoo/ycsb/measurements/Measurements.java
@@ -18,8 +18,8 @@
 package com.yahoo.ycsb.measurements;
 
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
 
 import com.yahoo.ycsb.measurements.exporter.MeasurementsExporter;
 
@@ -34,6 +34,8 @@ public class Measurements
 	public static final String MEASUREMENT_TYPE_PROPERTY = "measurementtype";
 
 	private static final String MEASUREMENT_TYPE_PROPERTY_DEFAULT = "histogram";
+    public static final String MEASUREMENT_INTERVAL = "measurement.interval";
+    private static final String MEASUREMENT_INTERVAL_DEFAULT = "op";
 
 	static Measurements singleton=null;
 	
@@ -56,9 +58,10 @@ public class Measurements
 		return singleton;
 	}
 
-	HashMap<String,OneMeasurement> data;
-	boolean histogram=true;
-
+    final ConcurrentHashMap<String,OneMeasurement> _opToMesurementMap;
+    final ConcurrentHashMap<String,OneMeasurement> _opToIntendedMesurementMap;
+	final int _measurementType;
+	final int _measurementInterval;
 	private Properties _props;
 	
       /**
@@ -66,51 +69,110 @@ public class Measurements
        */
 	public Measurements(Properties props)
 	{
-		data=new HashMap<String,OneMeasurement>();
+        _opToMesurementMap=new ConcurrentHashMap<String,OneMeasurement>();
+        _opToIntendedMesurementMap=new ConcurrentHashMap<String,OneMeasurement>();
 		
 		_props=props;
 		
-		if (_props.getProperty(MEASUREMENT_TYPE_PROPERTY, MEASUREMENT_TYPE_PROPERTY_DEFAULT).compareTo("histogram")==0)
+		String mTypeString = _props.getProperty(MEASUREMENT_TYPE_PROPERTY, MEASUREMENT_TYPE_PROPERTY_DEFAULT);
+        if (mTypeString.equals("histogram"))
 		{
-			histogram=true;
+		    _measurementType = 0;
 		}
-		else
+		else if (mTypeString.equals("hdrhistogram"))
 		{
-			histogram=false;
-		}
-	}
-	
-	OneMeasurement constructOneMeasurement(String name)
-	{
-		if (histogram)
+            _measurementType = 1;
+        }
+		else if (mTypeString.equals("hdrhistogram+histogram"))
+        {
+            _measurementType = 2;
+        }
+		else if (mTypeString.equals("timeseries"))
 		{
-			return new OneMeasurementHistogram(name,_props);
+		    _measurementType = 3;
 		}
-		else
-		{
-			return new OneMeasurementTimeSeries(name,_props);
+		else {
+		    throw new IllegalArgumentException("unknown "+MEASUREMENT_TYPE_PROPERTY+"="+mTypeString);
 		}
+        
+        String mIntervalString = _props.getProperty(MEASUREMENT_INTERVAL, MEASUREMENT_INTERVAL_DEFAULT);
+        if (mIntervalString.equals("op"))
+        {
+            _measurementInterval = 0;
+        }
+        else if (mIntervalString.equals("intended"))
+        {
+            _measurementInterval = 1;
+        }
+        else if (mIntervalString.equals("both"))
+        {
+            _measurementInterval = 2;
+        }
+        else {
+            throw new IllegalArgumentException("unknown "+MEASUREMENT_INTERVAL+"="+mIntervalString);
+        }
+        
 	}
+	
+    OneMeasurement constructOneMeasurement(String name)
+    {
+        switch (_measurementType)
+        {
+        case 0:
+            return new OneMeasurementHistogram(name, _props);
+        case 1:
+            return new OneMeasurementHdrHistogram(name, _props);
+        case 2:
+            return new TwoInOneMeasurement(name, 
+                    new OneMeasurementHdrHistogram("Hdr"+name, _props), 
+                    new OneMeasurementHistogram("Bucket"+name, _props));
+        default:
+            return new OneMeasurementTimeSeries(name, _props);
+        }
+    }
 
-      /**
-       * Report a single value of a single metric. E.g. for read latency, operation="READ" and latency is the measured value.
-       */
-	public synchronized void measure(String operation, int latency)
+    static class StartTimeHolder{
+        long time;
+        long startTime(){
+            if(time == 0) {
+                return System.nanoTime();
+            }
+            else {
+                return time;
+            }
+        }
+    }
+    ThreadLocal<StartTimeHolder> tlIntendedStartTime = new ThreadLocal<Measurements.StartTimeHolder>(){
+      protected StartTimeHolder initialValue() {
+          return new StartTimeHolder();
+      };
+    };
+    public void setIntendedStartTimeNs(long time){
+        if(_measurementInterval==0)
+            return;
+        tlIntendedStartTime.get().time=time;
+    }
+    
+    public long getIntendedtartTimeNs(){
+        if(_measurementInterval==0)
+            return 0L;
+        return tlIntendedStartTime.get().startTime();
+    }
+
+    /**
+     * Report a single value of a single metric. E.g. for read latency, operation="READ" and latency is the measured
+     * value.
+     */
+	public void measure(String operation, int latency)
 	{
-		if (!data.containsKey(operation))
-		{
-			synchronized(this)
-			{
-				if (!data.containsKey(operation))
-				{
-					data.put(operation,constructOneMeasurement(operation));
-				}
-			}
-		}
+	    if(_measurementInterval==1)
+	        return;
 		try
 		{
-			data.get(operation).measure(latency);
+			OneMeasurement m = getOpMeasurement(operation);
+            m.measure(latency);
 		}
+		// This seems like a terribly hacky way to cover up for a bug in the measurement code
 		catch (java.lang.ArrayIndexOutOfBoundsException e)
 		{
 			System.out.println("ERROR: java.lang.ArrayIndexOutOfBoundsException - ignoring and continuing");
@@ -118,23 +180,64 @@ public class Measurements
 			e.printStackTrace(System.out);
 		}
 	}
+    /**
+     * Report a single value of a single metric. E.g. for read latency, operation="READ" and latency is the measured
+     * value.
+     */
+    public void measureIntended(String operation, int latency)
+    {
+        if(_measurementInterval==0)
+            return;
+        try
+        {
+            OneMeasurement m = getOpIntendedMeasurement(operation);
+            m.measure(latency);
+        }
+        // This seems like a terribly hacky way to cover up for a bug in the measurement code
+        catch (java.lang.ArrayIndexOutOfBoundsException e)
+        {
+            System.out.println("ERROR: java.lang.ArrayIndexOutOfBoundsException - ignoring and continuing");
+            e.printStackTrace();
+            e.printStackTrace(System.out);
+        }
+    }
 
+    private OneMeasurement getOpMeasurement(String operation) {
+        OneMeasurement m = _opToMesurementMap.get(operation);
+        if(m == null)
+        {
+            m = constructOneMeasurement(operation);
+            OneMeasurement oldM = _opToMesurementMap.putIfAbsent(operation, m);
+            if(oldM != null)
+            {
+                m = oldM;
+            }
+        }
+        return m;
+    }
+    private OneMeasurement getOpIntendedMeasurement(String operation) {
+        OneMeasurement m = _opToIntendedMesurementMap.get(operation);
+        if(m == null)
+        {
+            final String name = _measurementInterval==1 ? operation : "Intended-" + operation;
+            m = constructOneMeasurement(name);
+            OneMeasurement oldM = _opToIntendedMesurementMap.putIfAbsent(operation, m);
+            if(oldM != null)
+            {
+                m = oldM;
+            }
+        }
+        return m;
+    }
       /**
-       * Report a return code for a single DB operaiton.
+       * Report a return code for a single DB operation.
        */
 	public void reportReturnCode(String operation, int code)
 	{
-		if (!data.containsKey(operation))
-		{
-			synchronized(this)
-			{
-				if (!data.containsKey(operation))
-				{
-					data.put(operation,constructOneMeasurement(operation));
-				}
-			}
-		}
-		data.get(operation).reportReturnCode(code);
+	    OneMeasurement m = _measurementInterval==1 ? 
+	            getOpIntendedMeasurement(operation) : 
+	            getOpMeasurement(operation);
+		m.reportReturnCode(code);
 	}
 	
   /**
@@ -145,10 +248,14 @@ public class Measurements
    */
   public void exportMeasurements(MeasurementsExporter exporter) throws IOException
   {
-    for (OneMeasurement measurement : data.values())
-    {
-      measurement.exportMeasurements(exporter);
-    }
+      for (OneMeasurement measurement : _opToMesurementMap.values())
+      {
+        measurement.exportMeasurements(exporter);
+      }
+      for (OneMeasurement measurement : _opToIntendedMesurementMap.values())
+      {
+        measurement.exportMeasurements(exporter);
+      }
   }
 	
       /**
@@ -157,11 +264,14 @@ public class Measurements
 	public synchronized String getSummary()
 	{
 		String ret="";
-		for (OneMeasurement m : data.values())
+		for (OneMeasurement m : _opToMesurementMap.values())
 		{
 			ret+=m.getSummary()+" ";
 		}
-		
+		for (OneMeasurement m : _opToIntendedMesurementMap.values())
+        {
+            ret+=m.getSummary()+" ";
+        }
 		return ret;
 	}
 }
diff --git a/core/src/main/java/com/yahoo/ycsb/measurements/OneMeasurementHdrHistogram.java b/core/src/main/java/com/yahoo/ycsb/measurements/OneMeasurementHdrHistogram.java
new file mode 100644
index 0000000000000000000000000000000000000000..fa0735f854f758b59d07c6c15de07b89ccef8cb4
--- /dev/null
+++ b/core/src/main/java/com/yahoo/ycsb/measurements/OneMeasurementHdrHistogram.java
@@ -0,0 +1,166 @@
+/**                                                                                                                                                                                
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.                                                                                                                             
+ *                                                                                                                                                                                 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you                                                                                                             
+ * may not use this file except in compliance with the License. You                                                                                                                
+ * may obtain a copy of the License at                                                                                                                                             
+ *                                                                                                                                                                                 
+ * http://www.apache.org/licenses/LICENSE-2.0                                                                                                                                      
+ *                                                                                                                                                                                 
+ * Unless required by applicable law or agreed to in writing, software                                                                                                             
+ * distributed under the License is distributed on an "AS IS" BASIS,                                                                                                               
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or                                                                                                                 
+ * implied. See the License for the specific language governing                                                                                                                    
+ * permissions and limitations under the License. See accompanying                                                                                                                 
+ * LICENSE file.                                                                                                                                                                   
+ */
+
+package com.yahoo.ycsb.measurements;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.text.DecimalFormat;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.HdrHistogram.Histogram;
+import org.HdrHistogram.HistogramLogWriter;
+import org.HdrHistogram.Recorder;
+
+import com.yahoo.ycsb.measurements.exporter.MeasurementsExporter;
+
+/**
+ * Take measurements and maintain a HdrHistogram of a given metric, such as READ LATENCY.
+ * 
+ * @author nitsanw
+ *
+ */
+public class OneMeasurementHdrHistogram extends OneMeasurement {
+    // we need one log per measurement histogram
+    final PrintStream log;
+    final HistogramLogWriter histogramLogWriter;
+    
+    final Recorder histogram = new Recorder(3);
+    final ConcurrentHashMap<Integer, AtomicInteger> returncodes;
+
+    Histogram totalHistogram;
+
+    public OneMeasurementHdrHistogram(String name, Properties props) {
+        super(name);
+        returncodes = new ConcurrentHashMap<Integer, AtomicInteger>();
+        boolean shouldLog = Boolean.parseBoolean(props.getProperty("hdrhistogram.fileoutput", "false"));
+        if (!shouldLog) {
+            log = null;
+            histogramLogWriter = null;
+            return;
+        }
+        try {
+            final String hdrOutputFilename = props.getProperty("hdrhistogram.output.path", "") +name+".hdr";
+            log = new PrintStream(new FileOutputStream(hdrOutputFilename), false);
+        } catch (FileNotFoundException e) {
+            throw new RuntimeException("Failed to open hdr histogram output file",e);
+        }
+        histogramLogWriter = new HistogramLogWriter(log);
+        histogramLogWriter.outputComment("[Logging for: " + name + "]");
+        histogramLogWriter.outputLogFormatVersion();
+        histogramLogWriter.outputStartTime(System.currentTimeMillis());
+        histogramLogWriter.outputLegend();
+    }
+
+    /**
+     * No need for synchronization, using CHM to deal with that
+     * 
+     * @see com.yahoo.ycsb.OneMeasurement#reportReturnCode(int)
+     */
+    public void reportReturnCode(int code) {
+        Integer Icode = code;
+        AtomicInteger counter = returncodes.get(Icode);
+        if (counter == null) {
+            AtomicInteger other = returncodes.putIfAbsent(Icode, counter = new AtomicInteger());
+            if (other != null) {
+                counter = other;
+            }
+        }
+
+        counter.incrementAndGet();
+    }
+
+    /**
+     * It appears latency is reported in micros.
+     * Using {@link Recorder} to support concurrent updates to histogram.
+     * 
+     * @see com.yahoo.ycsb.OneMeasurement#measure(int)
+     */
+    public void measure(int latencyInMicros) {
+        histogram.recordValue(latencyInMicros);
+    }
+
+    /**
+     * This is called from a main thread, on orderly termination.
+     * 
+     * @see com.yahoo.ycsb.measurements.OneMeasurement#exportMeasurements(com.yahoo.ycsb.measurements.exporter.MeasurementsExporter)
+     */
+    @Override
+    public void exportMeasurements(MeasurementsExporter exporter) throws IOException {
+        // accumulate the last interval which was not caught by status thread
+        Histogram intervalHistogram = getIntervalHistogramAndAccumulate();
+        if(histogramLogWriter != null) {
+            histogramLogWriter.outputIntervalHistogram(intervalHistogram);
+            // we can close now
+            log.close();
+        }
+        exporter.write(getName(), "Operations", totalHistogram.getTotalCount());
+        exporter.write(getName(), "AverageLatency(us)", totalHistogram.getMean());
+        exporter.write(getName(), "MinLatency(us)", totalHistogram.getMinValue());
+        exporter.write(getName(), "MaxLatency(us)", totalHistogram.getMaxValue());
+        exporter.write(getName(), "95thPercentileLatency(ms)", totalHistogram.getValueAtPercentile(90)/1000);
+        exporter.write(getName(), "99thPercentileLatency(ms)", totalHistogram.getValueAtPercentile(99)/1000);
+
+        for (Map.Entry<Integer, AtomicInteger> entry : returncodes.entrySet()) {
+            exporter.write(getName(), "Return=" + entry.getKey(), entry.getValue().get());
+        }
+    }
+
+    /**
+     * This is called periodically from the StatusThread. There's a single StatusThread per Client process.
+     * We optionally serialize the interval to log on this opportunity.
+     * @see com.yahoo.ycsb.measurements.OneMeasurement#getSummary()
+     */
+    @Override
+    public String getSummary() {
+        Histogram intervalHistogram = getIntervalHistogramAndAccumulate();
+        // we use the summary interval as the histogram file interval.
+        if(histogramLogWriter != null) {
+            histogramLogWriter.outputIntervalHistogram(intervalHistogram);
+        }
+        
+        DecimalFormat d = new DecimalFormat("#.##");
+        return "[" + getName() + 
+                ": Count=" + intervalHistogram.getTotalCount() + 
+                ", Max=" + intervalHistogram.getMaxValue() + 
+                ", Min=" + intervalHistogram.getMinValue() +
+                ", Avg=" + d.format(intervalHistogram.getMean()) +
+                ", 90=" + d.format(intervalHistogram.getValueAtPercentile(90)) +
+                ", 99=" + d.format(intervalHistogram.getValueAtPercentile(99)) +
+                ", 99.9=" + d.format(intervalHistogram.getValueAtPercentile(99.9)) +
+                ", 99.99=" + d.format(intervalHistogram.getValueAtPercentile(99.99)) +"]";
+    }
+
+    private Histogram getIntervalHistogramAndAccumulate() {
+        Histogram intervalHistogram = histogram.getIntervalHistogram();
+        // add this to the total time histogram.
+        if (totalHistogram == null) {
+            totalHistogram = intervalHistogram; 
+        }
+        else {
+            totalHistogram.add(intervalHistogram);
+        }
+        return intervalHistogram;
+    }
+
+}
diff --git a/core/src/main/java/com/yahoo/ycsb/measurements/TwoInOneMeasurement.java b/core/src/main/java/com/yahoo/ycsb/measurements/TwoInOneMeasurement.java
new file mode 100644
index 0000000000000000000000000000000000000000..c48a3f776b5fcdf75195ce8be3752d8e18ca80d9
--- /dev/null
+++ b/core/src/main/java/com/yahoo/ycsb/measurements/TwoInOneMeasurement.java
@@ -0,0 +1,79 @@
+/**                                                                                                                                                                                
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.                                                                                                                             
+ *                                                                                                                                                                                 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you                                                                                                             
+ * may not use this file except in compliance with the License. You                                                                                                                
+ * may obtain a copy of the License at                                                                                                                                             
+ *                                                                                                                                                                                 
+ * http://www.apache.org/licenses/LICENSE-2.0                                                                                                                                      
+ *                                                                                                                                                                                 
+ * Unless required by applicable law or agreed to in writing, software                                                                                                             
+ * distributed under the License is distributed on an "AS IS" BASIS,                                                                                                               
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or                                                                                                                 
+ * implied. See the License for the specific language governing                                                                                                                    
+ * permissions and limitations under the License. See accompanying                                                                                                                 
+ * LICENSE file.                                                                                                                                                                   
+ */
+
+package com.yahoo.ycsb.measurements;
+
+import java.io.IOException;
+
+import org.HdrHistogram.Recorder;
+
+import com.yahoo.ycsb.measurements.exporter.MeasurementsExporter;
+
+/**
+ * delegates to 2 measurement instances.
+ * @author nitsanw
+ *
+ */
+public class TwoInOneMeasurement extends OneMeasurement {
+    final OneMeasurement thing1,thing2;
+    public TwoInOneMeasurement(String name, OneMeasurement thing1,OneMeasurement thing2) {
+        super(name);
+        this.thing1 = thing1;
+        this.thing2 = thing2;
+    }
+
+    /**
+     * No need for synchronization, using CHM to deal with that
+     * 
+     * @see com.yahoo.ycsb.OneMeasurement#reportReturnCode(int)
+     */
+    public void reportReturnCode(int code) {
+        thing1.reportReturnCode(code);
+    }
+
+    /**
+     * It appears latency is reported in micros.
+     * Using {@link Recorder} to support concurrent updates to histogram.
+     * 
+     * @see com.yahoo.ycsb.OneMeasurement#measure(int)
+     */
+    public void measure(int latencyInMicros) {
+        thing1.measure(latencyInMicros);
+        thing2.measure(latencyInMicros);
+    }
+
+    /**
+     * This is called from a main thread, on orderly termination.
+     * 
+     * @see com.yahoo.ycsb.measurements.OneMeasurement#exportMeasurements(com.yahoo.ycsb.measurements.exporter.MeasurementsExporter)
+     */
+    @Override
+    public void exportMeasurements(MeasurementsExporter exporter) throws IOException {
+        thing1.exportMeasurements(exporter);
+        thing2.exportMeasurements(exporter);
+    }
+
+    /**
+     * This is called periodically from the StatusThread. There's a single StatusThread per Client process.
+     * We optionally serialize the interval to log on this opportunity.
+     * @see com.yahoo.ycsb.measurements.OneMeasurement#getSummary()
+     */
+    @Override
+    public String getSummary() {
+        return thing1.getSummary() + "\n" + thing2.getSummary();
+    }
+}
diff --git a/core/src/main/java/com/yahoo/ycsb/workloads/CoreWorkload.java b/core/src/main/java/com/yahoo/ycsb/workloads/CoreWorkload.java
index 4056bca038205218b3b8931d398625aa835e530d..b4a477b40e8371eca90a8da07b2a4b7aaae9ac59 100644
--- a/core/src/main/java/com/yahoo/ycsb/workloads/CoreWorkload.java
+++ b/core/src/main/java/com/yahoo/ycsb/workloads/CoreWorkload.java
@@ -18,6 +18,7 @@
 package com.yahoo.ycsb.workloads;
 
 import java.util.Properties;
+
 import com.yahoo.ycsb.*;
 import com.yahoo.ycsb.generator.CounterGenerator;
 import com.yahoo.ycsb.generator.DiscreteGenerator;
@@ -305,6 +306,8 @@ public class CoreWorkload extends Workload
 	boolean orderedinserts;
 
 	int recordcount;
+
+    private Measurements _measurements = Measurements.getMeasurements();
 	
 	protected static IntegerGenerator getFieldLengthGenerator(Properties p) throws WorkloadException{
 		IntegerGenerator fieldlengthgenerator;
@@ -349,7 +352,9 @@ public class CoreWorkload extends Workload
 		double insertproportion=Double.parseDouble(p.getProperty(INSERT_PROPORTION_PROPERTY,INSERT_PROPORTION_PROPERTY_DEFAULT));
 		double scanproportion=Double.parseDouble(p.getProperty(SCAN_PROPORTION_PROPERTY,SCAN_PROPORTION_PROPERTY_DEFAULT));
 		double readmodifywriteproportion=Double.parseDouble(p.getProperty(READMODIFYWRITE_PROPORTION_PROPERTY,READMODIFYWRITE_PROPORTION_PROPERTY_DEFAULT));
-		recordcount=Integer.parseInt(p.getProperty(Client.RECORD_COUNT_PROPERTY));
+		recordcount=Integer.parseInt(p.getProperty(Client.RECORD_COUNT_PROPERTY, Client.DEFAULT_RECORD_COUNT));
+		if(recordcount == 0)
+		    recordcount = Integer.MAX_VALUE;
 		String requestdistrib=p.getProperty(REQUEST_DISTRIBUTION_PROPERTY,REQUEST_DISTRIBUTION_PROPERTY_DEFAULT);
 		int maxscanlength=Integer.parseInt(p.getProperty(MAX_SCAN_LENGTH_PROPERTY,MAX_SCAN_LENGTH_PROPERTY_DEFAULT));
 		String scanlengthdistrib=p.getProperty(SCAN_LENGTH_DISTRIBUTION_PROPERTY,SCAN_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT);
@@ -685,8 +690,9 @@ public class CoreWorkload extends Workload
 		HashMap<String,ByteIterator> cells =
 		    new HashMap<String,ByteIterator>();
 
-		long st=System.nanoTime();
-
+		
+		long ist=_measurements.getIntendedtartTimeNs();
+	    long st = System.nanoTime();
 		db.read(table,keyname,fields,cells);
 		
 		db.update(table,keyname,values);
@@ -697,7 +703,8 @@ public class CoreWorkload extends Workload
       verifyRow(keyname, cells);
     }
 
-		Measurements.getMeasurements().measure("READ-MODIFY-WRITE", (int)((en-st)/1000));
+		_measurements .measure("READ-MODIFY-WRITE", (int)((en-st)/1000));
+		_measurements .measureIntended("READ-MODIFY-WRITE", (int)((en-ist)/1000));
 	}
 	
 	public void doTransactionScan(DB db)