import java.util.Deque;
import java.util.Iterator;
-import java.util.concurrent.ArrayBlockingQueue;
+import java.util.NoSuchElementException;
+import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import org.eclipse.jdt.annotation.Nullable;
import org.eclipse.tracecompass.internal.common.core.Activator;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
-
/**
- * A BufferedBlockingQueue is a wrapper around a {@link ArrayBlockingQueue},
- * which provides input and output "buffers", so that chunks of elements are
- * inserted into the queue, rather than individual elements.
- *
+ * A BufferedBlockingQueue is a wrapper around a {@link BlockingQueue}, which
+ * provides input and output "buffers", so that chunks of elements are inserted
+ * into the output buffer, rather than individual elements.
+ * <p>
* The API provides usual put() and take() methods which work on single
* elements. This class abstracts the concept of chunking, as well as the
- * required locking, from the users
- *
+ * required locking, from the users.
+ * <p>
* The main use case is for when different threads are doing insertion and
* removal operations. The added buffering reduces the contention between those
* two threads.
*/
public class BufferedBlockingQueue<T> implements Iterable<T> {
- private final BlockingQueue<Deque<T>> fInnerQueue;
+ private final BlockingDeque<Deque<T>> fInnerQueue;
private final Lock fInputLock = new ReentrantLock();
private final Lock fOutputLock = new ReentrantLock();
private final int fChunkSize;
*/
private int fInputBufferSize;
+ private final AtomicInteger fSize = new AtomicInteger(0);
+
+ private final Condition fInnerQueueNotEmpty = checkNotNull(fOutputLock.newCondition());
+
/**
* Constructor
*
* The size of an individual chunk.
*/
public BufferedBlockingQueue(int queueSize, int chunkSize) {
- fInnerQueue = new ArrayBlockingQueue<>(queueSize);
+ /* Add one to the queue size for the output buffer */
+ fInnerQueue = new LinkedBlockingDeque<>(queueSize + 1);
fChunkSize = chunkSize;
fInputBuffer = new ConcurrentLinkedDeque<>();
/*
- * Set fOutputBuffer to something to avoid a null reference, even though
- * this particular object will never be used.
+ * Create an empty output buffer to avoid a null reference, and add it
+ * to the queue. The output buffer is always the head of the queue.
*/
fOutputBuffer = new ConcurrentLinkedDeque<>();
+ fInnerQueue.add(fOutputBuffer);
}
/**
- * Put an element into the queue.
- *
- * This method will block the caller if the inner queue is full, waiting for
- * space to become available.
+ * Put an element at the tail of the queue.
+ * <p>
+ * This method will block the caller if the output buffer is full, waiting
+ * for space to become available.
*
* @param element
* The element to insert
public void put(T element) {
fInputLock.lock();
try {
- fInputBuffer.addFirst(element);
+ fInputBuffer.add(element);
+ fSize.incrementAndGet();
fInputBufferSize++;
if (fInputBufferSize >= fChunkSize) {
this.flushInputBuffer();
/**
* Flush the current input buffer, disregarding the expected buffer size
* limit.
- *
+ * <p>
* This will guarantee that an element that was inserted via the
* {@link #put} method becomes visible to the {@link #take} method.
*
- * This method will block if the inner queue is currently full, waiting for
- * space to become available.
+ * This method will block if the output buffer is currently full, waiting
+ * for space to become available.
*/
public void flushInputBuffer() {
+ boolean signal = false;
fInputLock.lock();
try {
/*
- * This call blocks if fInputBuffer is full, effectively blocking
+ * This call blocks if the inner queue is full, effectively blocking
* the caller until elements are removed via the take() method.
*/
if (!fInputBuffer.isEmpty()) {
fInnerQueue.put(fInputBuffer);
fInputBuffer = new ConcurrentLinkedDeque<>();
fInputBufferSize = 0;
+ signal = true;
}
} catch (InterruptedException e) {
} finally {
fInputLock.unlock();
}
+ if (signal) {
+ fOutputLock.lock();
+ try {
+ fInnerQueueNotEmpty.signalAll();
+ } finally {
+ fOutputLock.unlock();
+ }
+ }
}
/**
- * Retrieve an element from the queue.
- *
- * If the queue is empty, this call will block until an element is inserted.
+ * Retrieve the head element from the queue.
+ * <p>
+ * If the output buffer is empty, this call will block until an element is
+ * inserted and fills the input buffer, or until the not-empty input buffer
+ * is otherwise manually flushed.
*
* @return The retrieved element. It will be removed from the queue.
*/
try {
if (fOutputBuffer.isEmpty()) {
/*
- * Our read buffer is empty, take the next buffer in the queue.
- * This call will block if the inner queue is empty.
+ * Our read buffer is empty, remove it from the queue and peek
+ * the next buffer in the queue. The loop will block if the
+ * inner queue is empty, releasing the lock while it waits.
+ */
+ fInnerQueue.remove();
+ while (fInnerQueue.isEmpty()) {
+ fInnerQueueNotEmpty.await();
+ }
+ fOutputBuffer = checkNotNull(fInnerQueue.peek());
+ }
+ /* Our implementation guarantees this output buffer is not empty. */
+ T element = checkNotNull(fOutputBuffer.remove());
+ fSize.decrementAndGet();
+ return element;
+ } catch (InterruptedException e) {
+ Activator.instance().logError("Buffered queue interrupted", e); //$NON-NLS-1$
+ throw new IllegalStateException();
+ } finally {
+ fOutputLock.unlock();
+ }
+ }
+
+ /**
+ * Retrieve, but do not remove, the head element of this queue.
+ * <p>
+ * If the output buffer is empty, this call will block until an element is
+ * inserted and fills the input buffer, or until the not-empty input buffer
+ * is otherwise manually flushed.
+ *
+ * @return The head element of this queue, blocking until one is available
+ * @since 1.1
+ */
+ public T blockingPeek() {
+ fOutputLock.lock();
+ try {
+ if (fOutputBuffer.isEmpty()) {
+ /*
+ * Our read buffer is empty, remove it from the queue and peek
+ * the next buffer in the queue. The loop will block if the
+ * inner queue is empty, releasing the lock while it waits.
*/
- fOutputBuffer = checkNotNull(fInnerQueue.take());
+ fInnerQueue.remove();
+ while (fInnerQueue.isEmpty()) {
+ fInnerQueueNotEmpty.await();
+ }
+ fOutputBuffer = checkNotNull(fInnerQueue.peek());
}
- return checkNotNull(fOutputBuffer.removeLast());
+ /* Our implementation guarantees this output buffer is not empty. */
+ return checkNotNull(fOutputBuffer.peek());
} catch (InterruptedException e) {
Activator.instance().logError("Buffered queue interrupted", e); //$NON-NLS-1$
throw new IllegalStateException();
}
/**
- * Does the queue contain at least one element?
+ * Returns true if the queue size is 0.
*
- * @return if the queue is empty
+ * @return true if the queue is empty
*/
public boolean isEmpty() {
- /*
- * All three isEmpty()s are very fast, but we are hoping it
- * short-circuits on the first two since it would not make sense to have
- * an empty front and back and a full middle.
- */
- return (fInputBuffer.isEmpty() && fOutputBuffer.isEmpty() && fInnerQueue.isEmpty());
+ return (fSize.get() == 0);
}
/**
- * Instantiate an iterator on the complete data structure. This includes the
- * inner queue as well as the input and output buffers.
+ * Returns the number of elements in this queue.
*
- * If concurrent insertions happen while the iterator is being used, it is
+ * @return the number of elements in this queue
+ * @since 1.1
+ */
+ public int size() {
+ return fSize.get();
+ }
+
+ /**
+ * Instantiate an iterator on the complete data structure. This includes the
+ * input buffer as well as the output buffer. The elements will be returned
+ * in order from last (tail) to first (head).
+ * <p>
+ * If concurrent removals happen while the iterator is being used, it is
* possible for an element that was actually in the queue when the call was
* made to have been removed by the {@link #take} method in the meantime.
* However, this iterator guarantees that each element is either inside the
* queue OR was removed by the {@link #take} method. No element should
* "fall in the cracks".
+ * <p>
+ * The iterator itself is not safe to use concurrently by different threads.
+ * <p>
+ * The {@link Iterator#remove()} operation is not supported by this
+ * iterator.
*
- * @return An iterator over the whole buffered queue
+ * @return An iterator over the whole buffered queue in reverse sequence
*/
@Override
public Iterator<T> iterator() {
+ return new Itr();
+ }
+
+ private class Itr implements Iterator<T> {
/*
- * Note that the iterators of ArrayBlockingQueue and
+ * Note that the iterators of LinkedBlockingDeque and
* ConcurrentLinkedDeque are thread-safe, which allows iterating on them
* while they are being modified without having to lock the accesses.
*
* To make sure we do not "miss" any elements, we need to look through
- * the input buffer first, then the inner queue, then the output buffer.
+ * the input buffer first, then the inner queue buffers in descending
+ * order, ending with the output buffer.
*/
+ private @Nullable T fNext = null;
+ private Iterator<T> fBufferIterator;
+ private final Iterator<Deque<T>> fQueueIterator;
+
+ Itr() {
+ fInputLock.lock();
+ try {
+ fBufferIterator = checkNotNull(fInputBuffer.descendingIterator());
+ fQueueIterator = checkNotNull(fInnerQueue.descendingIterator());
+ } finally {
+ fInputLock.unlock();
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (fNext != null) {
+ return true;
+ }
+ if (fBufferIterator.hasNext()) {
+ fNext = fBufferIterator.next();
+ return true;
+ }
+ if (fQueueIterator.hasNext()) {
+ fBufferIterator = checkNotNull(fQueueIterator.next().descendingIterator());
+ return hasNext();
+ }
+ return false;
+ }
- Iterator<T> inputIterator = fInputBuffer.iterator();
- Iterator<T> queueIterator = Iterables.concat(fInnerQueue).iterator();
- Iterator<T> outputIterator = fOutputBuffer.iterator();
+ @Override
+ public T next() {
+ if (hasNext()) {
+ T next = fNext;
+ if (next != null) {
+ fNext = null;
+ return next;
+ }
+ }
+ throw new NoSuchElementException();
+ }
- return checkNotNull(Iterators.concat(inputIterator, queueIterator, outputIterator));
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
}
}