Bug 473441: Fix concurrency issue in ThreadedHistoryTreeBackend
authorBernd Hufmann <Bernd.Hufmann@ericsson.com>
Mon, 27 Jul 2015 17:13:46 +0000 (13:13 -0400)
committerPatrick Tasse <patrick.tasse@gmail.com>
Mon, 10 Aug 2015 19:17:23 +0000 (15:17 -0400)
We add a blockingPeek() method to the BBQ, which allows blocking
on the queue without removing elements from it. This allows
iterating threads doing query to continue seeing the element
while it is being inserted.

The implementation is changed so that the output buffer remains in the
inner queue until it is empty. Only then is it removed from the inner
queue and the next head of the queue becomes the output buffer.

The iterator is fixed to be a true descending iterator without
duplications and that properly handles concurrent modification of the
inner queue.

The size() method is added.

Change-Id: I9e708824748a625010aded82a3198f096c1fc427
Signed-off-by: Bernd Hufmann <Bernd.Hufmann@ericsson.com>
Signed-off-by: Alexandre Montplaisir <alexmonthy@voxpopuli.im>
Signed-off-by: Patrick Tasse <patrick.tasse@gmail.com>
Reviewed-on: https://git.eclipse.org/r/52640
Reviewed-by: Hudson CI
Reviewed-by: Bernd Hufmann <bernd.hufmann@ericsson.com>
Tested-by: Bernd Hufmann <bernd.hufmann@ericsson.com>
common/org.eclipse.tracecompass.common.core.tests/src/org/eclipse/tracecompass/common/core/tests/collect/BufferedBlockingQueueTest.java
common/org.eclipse.tracecompass.common.core/src/org/eclipse/tracecompass/common/core/collect/BufferedBlockingQueue.java
statesystem/org.eclipse.tracecompass.statesystem.core/src/org/eclipse/tracecompass/internal/statesystem/core/backend/historytree/ThreadedHistoryTreeBackend.java

index 0b3891bfa26971bcb4aef3d66c74283880d3219a..80f396cb991ebab68854a76dbf82be39e21cad12 100644 (file)
@@ -223,6 +223,56 @@ public class BufferedBlockingQueueTest {
         producer.join();
     }
 
+    /**
+     * Read with a producer and a consumer using
+     * {@link BufferedBlockingQueue#blockingPeek()}.
+     *
+     * @throws InterruptedException
+     *             The test was interrupted
+     */
+    @Test
+    public void testBlockingPeek() throws InterruptedException {
+        /* A character not found in the test string */
+        final Character lastElement = '%';
+
+        final StringBuilder sb = new StringBuilder();
+
+        Thread consumer = new Thread() {
+            @Override
+            public void run() {
+                boolean isFinished = false;
+                while (!isFinished) {
+                    // Read last element without removing it
+                    Character s = charQueue.blockingPeek();
+                    isFinished = s.equals(lastElement);
+                    if (!isFinished) {
+                        sb.append(s);
+                    }
+                    // Remove element
+                    charQueue.take();
+                }
+            }
+        };
+        consumer.start();
+
+        Thread producer = new Thread() {
+            @Override
+            public void run() {
+                for (char c : testString.toCharArray()) {
+                    charQueue.put(c);
+                }
+                charQueue.put(lastElement);
+                charQueue.flushInputBuffer();
+            }
+        };
+        producer.start();
+
+        producer.join();
+        consumer.join();
+
+        assertEquals(testString, sb.toString());
+    }
+
     /**
      * Test the contents returned by {@link BufferedBlockingQueue#iterator()}.
      *
index b3365b0b54d0fb18388db258890892f42cdf81b0..088ce19362eb3e1bec65d0a250eeda1a576ee2d2 100644 (file)
@@ -17,26 +17,28 @@ import static org.eclipse.tracecompass.common.core.NonNullUtils.checkNotNull;
 
 import java.util.Deque;
 import java.util.Iterator;
-import java.util.concurrent.ArrayBlockingQueue;
+import java.util.NoSuchElementException;
+import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.eclipse.jdt.annotation.Nullable;
 import org.eclipse.tracecompass.internal.common.core.Activator;
 
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
-
 /**
- * A BufferedBlockingQueue is a wrapper around a {@link ArrayBlockingQueue},
- * which provides input and output "buffers", so that chunks of elements are
- * inserted into the queue, rather than individual elements.
- *
+ * A BufferedBlockingQueue is a wrapper around a {@link BlockingQueue}, which
+ * provides input and output "buffers", so that chunks of elements are inserted
+ * into the output buffer, rather than individual elements.
+ * <p>
  * The API provides usual put() and take() methods which work on single
  * elements. This class abstracts the concept of chunking, as well as the
- * required locking, from the users
- *
+ * required locking, from the users.
+ * <p>
  * The main use case is for when different threads are doing insertion and
  * removal operations. The added buffering reduces the contention between those
  * two threads.
@@ -47,7 +49,7 @@ import com.google.common.collect.Iterators;
  */
 public class BufferedBlockingQueue<T> implements Iterable<T> {
 
-    private final BlockingQueue<Deque<T>> fInnerQueue;
+    private final BlockingDeque<Deque<T>> fInnerQueue;
     private final Lock fInputLock = new ReentrantLock();
     private final Lock fOutputLock = new ReentrantLock();
     private final int fChunkSize;
@@ -62,6 +64,10 @@ public class BufferedBlockingQueue<T> implements Iterable<T> {
      */
     private int fInputBufferSize;
 
+    private final AtomicInteger fSize = new AtomicInteger(0);
+
+    private final Condition fInnerQueueNotEmpty = checkNotNull(fOutputLock.newCondition());
+
     /**
      * Constructor
      *
@@ -72,22 +78,24 @@ public class BufferedBlockingQueue<T> implements Iterable<T> {
      *            The size of an individual chunk.
      */
     public BufferedBlockingQueue(int queueSize, int chunkSize) {
-        fInnerQueue = new ArrayBlockingQueue<>(queueSize);
+        /* Add one to the queue size for the output buffer */
+        fInnerQueue = new LinkedBlockingDeque<>(queueSize + 1);
         fChunkSize = chunkSize;
 
         fInputBuffer = new ConcurrentLinkedDeque<>();
         /*
-         * Set fOutputBuffer to something to avoid a null reference, even though
-         * this particular object will never be used.
+         * Create an empty output buffer to avoid a null reference, and add it
+         * to the queue. The output buffer is always the head of the queue.
          */
         fOutputBuffer = new ConcurrentLinkedDeque<>();
+        fInnerQueue.add(fOutputBuffer);
     }
 
     /**
-     * Put an element into the queue.
-     *
-     * This method will block the caller if the inner queue is full, waiting for
-     * space to become available.
+     * Put an element at the tail of the queue.
+     * <p>
+     * This method will block the caller if the output buffer is full, waiting
+     * for space to become available.
      *
      * @param element
      *            The element to insert
@@ -95,7 +103,8 @@ public class BufferedBlockingQueue<T> implements Iterable<T> {
     public void put(T element) {
         fInputLock.lock();
         try {
-            fInputBuffer.addFirst(element);
+            fInputBuffer.add(element);
+            fSize.incrementAndGet();
             fInputBufferSize++;
             if (fInputBufferSize >= fChunkSize) {
                 this.flushInputBuffer();
@@ -108,24 +117,26 @@ public class BufferedBlockingQueue<T> implements Iterable<T> {
     /**
      * Flush the current input buffer, disregarding the expected buffer size
      * limit.
-     *
+     * <p>
      * This will guarantee that an element that was inserted via the
      * {@link #put} method becomes visible to the {@link #take} method.
      *
-     * This method will block if the inner queue is currently full, waiting for
-     * space to become available.
+     * This method will block if the output buffer is currently full, waiting
+     * for space to become available.
      */
     public void flushInputBuffer() {
+        boolean signal = false;
         fInputLock.lock();
         try {
             /*
-             * This call blocks if fInputBuffer is full, effectively blocking
+             * This call blocks if the inner queue is full, effectively blocking
              * the caller until elements are removed via the take() method.
              */
             if (!fInputBuffer.isEmpty()) {
                 fInnerQueue.put(fInputBuffer);
                 fInputBuffer = new ConcurrentLinkedDeque<>();
                 fInputBufferSize = 0;
+                signal = true;
             }
 
         } catch (InterruptedException e) {
@@ -133,12 +144,22 @@ public class BufferedBlockingQueue<T> implements Iterable<T> {
         } finally {
             fInputLock.unlock();
         }
+        if (signal) {
+            fOutputLock.lock();
+            try {
+                fInnerQueueNotEmpty.signalAll();
+            } finally {
+                fOutputLock.unlock();
+            }
+        }
     }
 
     /**
-     * Retrieve an element from the queue.
-     *
-     * If the queue is empty, this call will block until an element is inserted.
+     * Retrieve the head element from the queue.
+     * <p>
+     * If the output buffer is empty, this call will block until an element is
+     * inserted and fills the input buffer, or until the not-empty input buffer
+     * is otherwise manually flushed.
      *
      * @return The retrieved element. It will be removed from the queue.
      */
@@ -147,12 +168,55 @@ public class BufferedBlockingQueue<T> implements Iterable<T> {
         try {
             if (fOutputBuffer.isEmpty()) {
                 /*
-                 * Our read buffer is empty, take the next buffer in the queue.
-                 * This call will block if the inner queue is empty.
+                 * Our read buffer is empty, remove it from the queue and peek
+                 * the next buffer in the queue. The loop will block if the
+                 * inner queue is empty, releasing the lock while it waits.
+                 */
+                fInnerQueue.remove();
+                while (fInnerQueue.isEmpty()) {
+                    fInnerQueueNotEmpty.await();
+                }
+                fOutputBuffer = checkNotNull(fInnerQueue.peek());
+            }
+            /* Our implementation guarantees this output buffer is not empty. */
+            T element = checkNotNull(fOutputBuffer.remove());
+            fSize.decrementAndGet();
+            return element;
+        } catch (InterruptedException e) {
+            Activator.instance().logError("Buffered queue interrupted", e); //$NON-NLS-1$
+            throw new IllegalStateException();
+        } finally {
+            fOutputLock.unlock();
+        }
+    }
+
+    /**
+     * Retrieve, but do not remove, the head element of this queue.
+     * <p>
+     * If the output buffer is empty, this call will block until an element is
+     * inserted and fills the input buffer, or until the not-empty input buffer
+     * is otherwise manually flushed.
+     *
+     * @return The head element of this queue, blocking until one is available
+     * @since 1.1
+     */
+    public T blockingPeek() {
+        fOutputLock.lock();
+        try {
+            if (fOutputBuffer.isEmpty()) {
+                /*
+                 * Our read buffer is empty, remove it from the queue and peek
+                 * the next buffer in the queue. The loop will block if the
+                 * inner queue is empty, releasing the lock while it waits.
                  */
-                fOutputBuffer = checkNotNull(fInnerQueue.take());
+                fInnerQueue.remove();
+                while (fInnerQueue.isEmpty()) {
+                    fInnerQueueNotEmpty.await();
+                }
+                fOutputBuffer = checkNotNull(fInnerQueue.peek());
             }
-            return checkNotNull(fOutputBuffer.removeLast());
+            /* Our implementation guarantees this output buffer is not empty. */
+            return checkNotNull(fOutputBuffer.peek());
         } catch (InterruptedException e) {
             Activator.instance().logError("Buffered queue interrupted", e); //$NON-NLS-1$
             throw new IllegalStateException();
@@ -162,48 +226,104 @@ public class BufferedBlockingQueue<T> implements Iterable<T> {
     }
 
     /**
-     * Does the queue contain at least one element?
+     * Returns true if the queue size is 0.
      *
-     * @return if the queue is empty
+     * @return true if the queue is empty
      */
     public boolean isEmpty() {
-        /*
-         * All three isEmpty()s are very fast, but we are hoping it
-         * short-circuits on the first two since it would not make sense to have
-         * an empty front and back and a full middle.
-         */
-        return (fInputBuffer.isEmpty() && fOutputBuffer.isEmpty() && fInnerQueue.isEmpty());
+        return (fSize.get() == 0);
     }
 
     /**
-     * Instantiate an iterator on the complete data structure. This includes the
-     * inner queue as well as the input and output buffers.
+     * Returns the number of elements in this queue.
      *
-     * If concurrent insertions happen while the iterator is being used, it is
+     * @return the number of elements in this queue
+     * @since 1.1
+     */
+    public int size() {
+        return fSize.get();
+    }
+
+    /**
+     * Instantiate an iterator on the complete data structure. This includes the
+     * input buffer as well as the output buffer. The elements will be returned
+     * in order from last (tail) to first (head).
+     * <p>
+     * If concurrent removals happen while the iterator is being used, it is
      * possible for an element that was actually in the queue when the call was
      * made to have been removed by the {@link #take} method in the meantime.
      * However, this iterator guarantees that each element is either inside the
      * queue OR was removed by the {@link #take} method. No element should
      * "fall in the cracks".
+     * <p>
+     * The iterator itself is not safe to use concurrently by different threads.
+     * <p>
+     * The {@link Iterator#remove()} operation is not supported by this
+     * iterator.
      *
-     * @return An iterator over the whole buffered queue
+     * @return An iterator over the whole buffered queue in reverse sequence
      */
     @Override
     public Iterator<T> iterator() {
+        return new Itr();
+    }
+
+    private class Itr implements Iterator<T> {
         /*
-         * Note that the iterators of ArrayBlockingQueue and
+         * Note that the iterators of LinkedBlockingDeque and
          * ConcurrentLinkedDeque are thread-safe, which allows iterating on them
          * while they are being modified without having to lock the accesses.
          *
          * To make sure we do not "miss" any elements, we need to look through
-         * the input buffer first, then the inner queue, then the output buffer.
+         * the input buffer first, then the inner queue buffers in descending
+         * order, ending with the output buffer.
          */
+        private @Nullable T fNext = null;
+        private Iterator<T> fBufferIterator;
+        private final Iterator<Deque<T>> fQueueIterator;
+
+        Itr() {
+            fInputLock.lock();
+            try {
+                fBufferIterator = checkNotNull(fInputBuffer.descendingIterator());
+                fQueueIterator = checkNotNull(fInnerQueue.descendingIterator());
+            } finally {
+                fInputLock.unlock();
+            }
+        }
+
+        @Override
+        public boolean hasNext() {
+            if (fNext != null) {
+                return true;
+            }
+            if (fBufferIterator.hasNext()) {
+                fNext = fBufferIterator.next();
+                return true;
+            }
+            if (fQueueIterator.hasNext()) {
+                fBufferIterator = checkNotNull(fQueueIterator.next().descendingIterator());
+                return hasNext();
+            }
+            return false;
+        }
 
-        Iterator<T> inputIterator = fInputBuffer.iterator();
-        Iterator<T> queueIterator = Iterables.concat(fInnerQueue).iterator();
-        Iterator<T> outputIterator = fOutputBuffer.iterator();
+        @Override
+        public T next() {
+            if (hasNext()) {
+                T next = fNext;
+                if (next != null) {
+                    fNext = null;
+                    return next;
+                }
+            }
+            throw new NoSuchElementException();
+        }
 
-        return checkNotNull(Iterators.concat(inputIterator, queueIterator, outputIterator));
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
     }
 
 }
index 940264ca07f4204f5ba00d224ee5209ec4dea6fa..b998a4a0c11a3eb02446dce3732dd9518e6c1112 100644 (file)
@@ -186,13 +186,15 @@ public final class ThreadedHistoryTreeBackend extends HistoryTreeBackend
 
     @Override
     public void run() {
-        HTInterval currentInterval;
         try {
-            currentInterval = intervalQueue.take();
+            HTInterval currentInterval = intervalQueue.blockingPeek();
             while (currentInterval.getStartTime() != -1) {
                 /* Send the interval to the History Tree */
                 getSHT().insertInterval(currentInterval);
-                currentInterval = intervalQueue.take();
+                /* Actually remove the interval from the queue */
+                // FIXME Replace with remove() once it is implemented.
+                intervalQueue.take();
+                currentInterval = intervalQueue.blockingPeek();
             }
             if (currentInterval.getAttribute() != -1) {
                 /* Make sure this is the "poison pill" we are waiting for */
This page took 0.032266 seconds and 5 git commands to generate.