common: Introduce a generic BufferedBlockingQueue
authorMatthew Khouzam <matthew.khouzam@ericsson.com>
Tue, 21 Apr 2015 02:08:05 +0000 (22:08 -0400)
committerMatthew Khouzam <matthew.khouzam@ericsson.com>
Fri, 1 May 2015 20:36:06 +0000 (16:36 -0400)
The BufferedBlockingQueue (nicknamed "BBQ") is a wrapper around
a standard ArrayBlockingQueue, which accumulates elements in a
separate Collection before putting them in the actual Queue.
This allows a producing and a consuming threads to not block each
other as much when they are producing and consuming at the same
time.

An identified use case is the ThreadedHistoryTreeProvider, and
perhaps eventually the AbstractTmfStateProvider too.

Change-Id: I010547d7914c4c377bf1c85f4f830bff0aa8740f
Signed-off-by: Matthew Khouzam <matthew.khouzam@ericsson.com>
Signed-off-by: Alexandre Montplaisir <alexmonthy@voxpopuli.im>
Reviewed-on: https://git.eclipse.org/r/46148
Reviewed-by: Hudson CI
org.eclipse.tracecompass.common.core/META-INF/MANIFEST.MF
org.eclipse.tracecompass.common.core/src/org/eclipse/tracecompass/common/core/collect/BufferedBlockingQueue.java [new file with mode: 0644]
org.eclipse.tracecompass.common.core/src/org/eclipse/tracecompass/common/core/collect/package-info.java [new file with mode: 0644]

index 01a23719c4ac66229c63529a30b3ffea8b8026e6..8e5ae77d797a4f3d19a9e9769cec2cffc0002f3e 100644 (file)
@@ -11,4 +11,6 @@ Bundle-RequiredExecutionEnvironment: JavaSE-1.7
 Require-Bundle: org.eclipse.core.runtime,
  org.eclipse.core.resources
 Export-Package: org.eclipse.tracecompass.common.core,
+ org.eclipse.tracecompass.common.core.collect,
  org.eclipse.tracecompass.internal.common.core;x-internal:=true
+Import-Package: com.google.common.collect
diff --git a/org.eclipse.tracecompass.common.core/src/org/eclipse/tracecompass/common/core/collect/BufferedBlockingQueue.java b/org.eclipse.tracecompass.common.core/src/org/eclipse/tracecompass/common/core/collect/BufferedBlockingQueue.java
new file mode 100644 (file)
index 0000000..b3365b0
--- /dev/null
@@ -0,0 +1,209 @@
+/*******************************************************************************
+ * Copyright (c) 2015 Ericsson, EfficiOS Inc., and others
+ *
+ * All rights reserved. This program and the accompanying materials are
+ * made available under the terms of the Eclipse Public License v1.0 which
+ * accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ *   Matthew Khouzam - Initial API and implementation
+ *   Alexandre Montplaisir - Initial API and implementation
+ *******************************************************************************/
+
+package org.eclipse.tracecompass.common.core.collect;
+
+import static org.eclipse.tracecompass.common.core.NonNullUtils.checkNotNull;
+
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.eclipse.tracecompass.internal.common.core.Activator;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+
+/**
+ * A BufferedBlockingQueue is a wrapper around a {@link ArrayBlockingQueue},
+ * which provides input and output "buffers", so that chunks of elements are
+ * inserted into the queue, rather than individual elements.
+ *
+ * The API provides usual put() and take() methods which work on single
+ * elements. This class abstracts the concept of chunking, as well as the
+ * required locking, from the users
+ *
+ * The main use case is for when different threads are doing insertion and
+ * removal operations. The added buffering reduces the contention between those
+ * two threads.
+ *
+ * @param <T>
+ *            The data type of the elements contained by the queue
+ * @since 1.0
+ */
+public class BufferedBlockingQueue<T> implements Iterable<T> {
+
+    private final BlockingQueue<Deque<T>> fInnerQueue;
+    private final Lock fInputLock = new ReentrantLock();
+    private final Lock fOutputLock = new ReentrantLock();
+    private final int fChunkSize;
+
+    private Deque<T> fInputBuffer;
+    private Deque<T> fOutputBuffer;
+
+    /*
+     * ConcurrentLinkedDeque's size() method does not run in constant time.
+     * Since we know we will only increment it, keep track of the number of
+     * insertions ourselves. It does not matter if the actual size is not exact.
+     */
+    private int fInputBufferSize;
+
+    /**
+     * Constructor
+     *
+     * @param queueSize
+     *            The size of the actual blocking queue. This is the number of
+     *            *chunks* that will go in the queue.
+     * @param chunkSize
+     *            The size of an individual chunk.
+     */
+    public BufferedBlockingQueue(int queueSize, int chunkSize) {
+        fInnerQueue = new ArrayBlockingQueue<>(queueSize);
+        fChunkSize = chunkSize;
+
+        fInputBuffer = new ConcurrentLinkedDeque<>();
+        /*
+         * Set fOutputBuffer to something to avoid a null reference, even though
+         * this particular object will never be used.
+         */
+        fOutputBuffer = new ConcurrentLinkedDeque<>();
+    }
+
+    /**
+     * Put an element into the queue.
+     *
+     * This method will block the caller if the inner queue is full, waiting for
+     * space to become available.
+     *
+     * @param element
+     *            The element to insert
+     */
+    public void put(T element) {
+        fInputLock.lock();
+        try {
+            fInputBuffer.addFirst(element);
+            fInputBufferSize++;
+            if (fInputBufferSize >= fChunkSize) {
+                this.flushInputBuffer();
+            }
+        } finally {
+            fInputLock.unlock();
+        }
+    }
+
+    /**
+     * Flush the current input buffer, disregarding the expected buffer size
+     * limit.
+     *
+     * This will guarantee that an element that was inserted via the
+     * {@link #put} method becomes visible to the {@link #take} method.
+     *
+     * This method will block if the inner queue is currently full, waiting for
+     * space to become available.
+     */
+    public void flushInputBuffer() {
+        fInputLock.lock();
+        try {
+            /*
+             * This call blocks if fInputBuffer is full, effectively blocking
+             * the caller until elements are removed via the take() method.
+             */
+            if (!fInputBuffer.isEmpty()) {
+                fInnerQueue.put(fInputBuffer);
+                fInputBuffer = new ConcurrentLinkedDeque<>();
+                fInputBufferSize = 0;
+            }
+
+        } catch (InterruptedException e) {
+            Activator.instance().logError("Buffered queue interrupted", e); //$NON-NLS-1$
+        } finally {
+            fInputLock.unlock();
+        }
+    }
+
+    /**
+     * Retrieve an element from the queue.
+     *
+     * If the queue is empty, this call will block until an element is inserted.
+     *
+     * @return The retrieved element. It will be removed from the queue.
+     */
+    public T take() {
+        fOutputLock.lock();
+        try {
+            if (fOutputBuffer.isEmpty()) {
+                /*
+                 * Our read buffer is empty, take the next buffer in the queue.
+                 * This call will block if the inner queue is empty.
+                 */
+                fOutputBuffer = checkNotNull(fInnerQueue.take());
+            }
+            return checkNotNull(fOutputBuffer.removeLast());
+        } catch (InterruptedException e) {
+            Activator.instance().logError("Buffered queue interrupted", e); //$NON-NLS-1$
+            throw new IllegalStateException();
+        } finally {
+            fOutputLock.unlock();
+        }
+    }
+
+    /**
+     * Does the queue contain at least one element?
+     *
+     * @return if the queue is empty
+     */
+    public boolean isEmpty() {
+        /*
+         * All three isEmpty()s are very fast, but we are hoping it
+         * short-circuits on the first two since it would not make sense to have
+         * an empty front and back and a full middle.
+         */
+        return (fInputBuffer.isEmpty() && fOutputBuffer.isEmpty() && fInnerQueue.isEmpty());
+    }
+
+    /**
+     * Instantiate an iterator on the complete data structure. This includes the
+     * inner queue as well as the input and output buffers.
+     *
+     * If concurrent insertions happen while the iterator is being used, it is
+     * possible for an element that was actually in the queue when the call was
+     * made to have been removed by the {@link #take} method in the meantime.
+     * However, this iterator guarantees that each element is either inside the
+     * queue OR was removed by the {@link #take} method. No element should
+     * "fall in the cracks".
+     *
+     * @return An iterator over the whole buffered queue
+     */
+    @Override
+    public Iterator<T> iterator() {
+        /*
+         * Note that the iterators of ArrayBlockingQueue and
+         * ConcurrentLinkedDeque are thread-safe, which allows iterating on them
+         * while they are being modified without having to lock the accesses.
+         *
+         * To make sure we do not "miss" any elements, we need to look through
+         * the input buffer first, then the inner queue, then the output buffer.
+         */
+
+        Iterator<T> inputIterator = fInputBuffer.iterator();
+        Iterator<T> queueIterator = Iterables.concat(fInnerQueue).iterator();
+        Iterator<T> outputIterator = fOutputBuffer.iterator();
+
+        return checkNotNull(Iterators.concat(inputIterator, queueIterator, outputIterator));
+    }
+
+}
diff --git a/org.eclipse.tracecompass.common.core/src/org/eclipse/tracecompass/common/core/collect/package-info.java b/org.eclipse.tracecompass.common.core/src/org/eclipse/tracecompass/common/core/collect/package-info.java
new file mode 100644 (file)
index 0000000..33a6c74
--- /dev/null
@@ -0,0 +1,14 @@
+/*******************************************************************************
+ * Copyright (c) 2015 EfficiOS Inc.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ *    Alexandre Montplaisir - Initial API and implementation
+ *******************************************************************************/
+
+@org.eclipse.jdt.annotation.NonNullByDefault
+package org.eclipse.tracecompass.common.core.collect;
\ No newline at end of file
This page took 0.031124 seconds and 5 git commands to generate.