Skip to content
Snippets Groups Projects
Commit 5e1ed2ac authored by Thomas Lopatic's avatar Thomas Lopatic
Browse files

Use a sliding window to track acknowledged values.

parent 15bffeed
No related branches found
No related tags found
No related merge requests found
package com.yahoo.ycsb.generator; package com.yahoo.ycsb.generator;
import java.util.PriorityQueue; import java.util.concurrent.locks.ReentrantLock;
/** /**
* A CounterGenerator that reports generated integers via lastInt() * A CounterGenerator that reports generated integers via lastInt()
...@@ -8,7 +8,10 @@ import java.util.PriorityQueue; ...@@ -8,7 +8,10 @@ import java.util.PriorityQueue;
*/ */
public class AcknowledgedCounterGenerator extends CounterGenerator public class AcknowledgedCounterGenerator extends CounterGenerator
{ {
private PriorityQueue<Integer> ack; private static final int WINDOW_SIZE = 10000;
private ReentrantLock lock;
private boolean[] window;
private int limit; private int limit;
/** /**
...@@ -17,7 +20,8 @@ public class AcknowledgedCounterGenerator extends CounterGenerator ...@@ -17,7 +20,8 @@ public class AcknowledgedCounterGenerator extends CounterGenerator
public AcknowledgedCounterGenerator(int countstart) public AcknowledgedCounterGenerator(int countstart)
{ {
super(countstart); super(countstart);
ack = new PriorityQueue<Integer>(); lock = new ReentrantLock();
window = new boolean[WINDOW_SIZE];
limit = countstart - 1; limit = countstart - 1;
} }
...@@ -34,17 +38,35 @@ public class AcknowledgedCounterGenerator extends CounterGenerator ...@@ -34,17 +38,35 @@ public class AcknowledgedCounterGenerator extends CounterGenerator
/** /**
* Make a generated counter value available via lastInt(). * Make a generated counter value available via lastInt().
*/ */
public synchronized void acknowledge(int value) public void acknowledge(int value)
{ {
ack.add(value); if (value > limit + WINDOW_SIZE) {
throw new RuntimeException("This should be a different exception.");
}
window[value % WINDOW_SIZE] = true;
if (lock.tryLock()) {
// move a contiguous sequence from the window
// over to the "limit" variable
try {
int index;
for (index = limit + 1; index <= value; ++index) {
int slot = index % WINDOW_SIZE;
// move a contiguous sequence from the priority queue if (!window[slot]) {
// over to the "limit" variable break;
}
Integer min; window[slot] = false;
}
while ((min = ack.peek()) != null && min == limit + 1) { limit = index - 1;
limit = ack.poll(); } finally {
lock.unlock();
}
} }
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment