diff --git a/core/src/main/java/com/yahoo/ycsb/generator/AcknowledgedCounterGenerator.java b/core/src/main/java/com/yahoo/ycsb/generator/AcknowledgedCounterGenerator.java new file mode 100644 index 0000000000000000000000000000000000000000..c8bcb42b7c06ff61025c0c1e1e36db9830f0657d --- /dev/null +++ b/core/src/main/java/com/yahoo/ycsb/generator/AcknowledgedCounterGenerator.java @@ -0,0 +1,43 @@ +package com.yahoo.ycsb.generator; + +import java.util.PriorityQueue; + +/** + * A CounterGenerator that reports generated integers via lastInt() + * only after they have been acknowledged. + */ +public class AcknowledgedCounterGenerator extends CounterGenerator +{ + private PriorityQueue<Integer> ack; + private int limit; + + /** + * Create a counter that starts at countstart. + */ + public AcknowledgedCounterGenerator(int countstart) + { + super(countstart); + ack = new PriorityQueue<Integer>(); + limit = countstart - 1; + } + + @Override + public int lastInt() + { + return limit; + } + + public synchronized void acknowledge(int value) + { + ack.add(value); + + // move a contiguous sequence from the priority queue + // over to the "limit" variable + + Integer min; + + while ((min = ack.peek()) != null && min == limit + 1) { + limit = ack.poll(); + } + } +} diff --git a/core/src/main/java/com/yahoo/ycsb/workloads/CoreWorkload.java b/core/src/main/java/com/yahoo/ycsb/workloads/CoreWorkload.java index b4a477b40e8371eca90a8da07b2a4b7aaae9ac59..2a145125be2d84ef6aa832a2eb0640b6b7e399d7 100644 --- a/core/src/main/java/com/yahoo/ycsb/workloads/CoreWorkload.java +++ b/core/src/main/java/com/yahoo/ycsb/workloads/CoreWorkload.java @@ -20,6 +20,7 @@ package com.yahoo.ycsb.workloads; import java.util.Properties; import com.yahoo.ycsb.*; +import com.yahoo.ycsb.generator.AcknowledgedCounterGenerator; import com.yahoo.ycsb.generator.CounterGenerator; import com.yahoo.ycsb.generator.DiscreteGenerator; import com.yahoo.ycsb.generator.ExponentialGenerator; @@ -299,7 +300,7 @@ public class CoreWorkload extends Workload Generator fieldchooser; - CounterGenerator transactioninsertkeysequence; + AcknowledgedCounterGenerator transactioninsertkeysequence; IntegerGenerator scanlength; @@ -417,7 +418,7 @@ public class CoreWorkload extends Workload operationchooser.addValue(readmodifywriteproportion,"READMODIFYWRITE"); } - transactioninsertkeysequence=new CounterGenerator(recordcount); + transactioninsertkeysequence=new AcknowledgedCounterGenerator(recordcount); if (requestdistrib.compareTo("uniform")==0) { keychooser=new UniformIntegerGenerator(0,recordcount-1); @@ -763,5 +764,6 @@ public class CoreWorkload extends Workload HashMap<String, ByteIterator> values = buildValues(dbkey); db.insert(table,dbkey,values); + transactioninsertkeysequence.acknowledge(keynum); } }