From e12457434dbc9d625046ac13f255785b711a6129 Mon Sep 17 00:00:00 2001
From: Thomas Lopatic <thomas@lopatic.de>
Date: Sun, 19 Jul 2015 13:31:35 +0200
Subject: [PATCH] Don't use keys of inserted records before they have actually
 been written to the database.

---
 .../AcknowledgedCounterGenerator.java         | 43 +++++++++++++++++++
 .../yahoo/ycsb/workloads/CoreWorkload.java    |  6 ++-
 2 files changed, 47 insertions(+), 2 deletions(-)
 create mode 100644 core/src/main/java/com/yahoo/ycsb/generator/AcknowledgedCounterGenerator.java

diff --git a/core/src/main/java/com/yahoo/ycsb/generator/AcknowledgedCounterGenerator.java b/core/src/main/java/com/yahoo/ycsb/generator/AcknowledgedCounterGenerator.java
new file mode 100644
index 00000000..c8bcb42b
--- /dev/null
+++ b/core/src/main/java/com/yahoo/ycsb/generator/AcknowledgedCounterGenerator.java
@@ -0,0 +1,43 @@
+package com.yahoo.ycsb.generator;
+
+import java.util.PriorityQueue;
+
+/**
+ * A CounterGenerator that reports generated integers via lastInt()
+ * only after they have been acknowledged.
+ */
+public class AcknowledgedCounterGenerator extends CounterGenerator
+{
+	private PriorityQueue<Integer> ack;
+	private int limit;
+
+	/**
+	 * Create a counter that starts at countstart.
+	 */
+	public AcknowledgedCounterGenerator(int countstart)
+	{
+		super(countstart);
+		ack = new PriorityQueue<Integer>();
+		limit = countstart - 1;
+	}
+
+	@Override
+	public int lastInt()
+	{
+		return limit;
+	}
+
+	public synchronized void acknowledge(int value)
+	{
+		ack.add(value);
+
+		// move a contiguous sequence from the priority queue
+		// over to the "limit" variable
+
+		Integer min;
+
+		while ((min = ack.peek()) != null && min == limit + 1) {
+			limit = ack.poll();
+		}
+	}
+}
diff --git a/core/src/main/java/com/yahoo/ycsb/workloads/CoreWorkload.java b/core/src/main/java/com/yahoo/ycsb/workloads/CoreWorkload.java
index b4a477b4..2a145125 100644
--- a/core/src/main/java/com/yahoo/ycsb/workloads/CoreWorkload.java
+++ b/core/src/main/java/com/yahoo/ycsb/workloads/CoreWorkload.java
@@ -20,6 +20,7 @@ package com.yahoo.ycsb.workloads;
 import java.util.Properties;
 
 import com.yahoo.ycsb.*;
+import com.yahoo.ycsb.generator.AcknowledgedCounterGenerator;
 import com.yahoo.ycsb.generator.CounterGenerator;
 import com.yahoo.ycsb.generator.DiscreteGenerator;
 import com.yahoo.ycsb.generator.ExponentialGenerator;
@@ -299,7 +300,7 @@ public class CoreWorkload extends Workload
 
 	Generator fieldchooser;
 
-	CounterGenerator transactioninsertkeysequence;
+	AcknowledgedCounterGenerator transactioninsertkeysequence;
 	
 	IntegerGenerator scanlength;
 	
@@ -417,7 +418,7 @@ public class CoreWorkload extends Workload
 			operationchooser.addValue(readmodifywriteproportion,"READMODIFYWRITE");
 		}
 
-		transactioninsertkeysequence=new CounterGenerator(recordcount);
+		transactioninsertkeysequence=new AcknowledgedCounterGenerator(recordcount);
 		if (requestdistrib.compareTo("uniform")==0)
 		{
 			keychooser=new UniformIntegerGenerator(0,recordcount-1);
@@ -763,5 +764,6 @@ public class CoreWorkload extends Workload
 
 		HashMap<String, ByteIterator> values = buildValues(dbkey);
 		db.insert(table,dbkey,values);
+		transactioninsertkeysequence.acknowledge(keynum);
 	}
 }
-- 
GitLab