gdbtrace: Move plugins to their own sub-directory
[deliverable/tracecompass.git] / org.eclipse.tracecompass.common.core.tests / src / org / eclipse / tracecompass / common / core / tests / collect / BufferedBlockingQueueTest.java
index a945059d173ce1f92d7a372a7eb02fb205fc4d2b..8a27e93f3cbe2e8a08ab709f812fa06eb16076d3 100644 (file)
 
 package org.eclipse.tracecompass.common.core.tests.collect;
 
-import static org.eclipse.tracecompass.common.core.NonNullUtils.nullToEmptyString;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.util.Collection;
 import java.util.Deque;
 import java.util.HashSet;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.Callable;
@@ -30,6 +31,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import org.eclipse.jdt.annotation.NonNull;
 import org.eclipse.tracecompass.common.core.NonNullUtils;
 import org.eclipse.tracecompass.common.core.collect.BufferedBlockingQueue;
 import org.junit.Before;
@@ -38,8 +40,12 @@ import org.junit.Test;
 import org.junit.rules.TestRule;
 import org.junit.rules.Timeout;
 
+import com.google.common.base.Functions;
+import com.google.common.collect.FluentIterable;
 import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
+import com.google.common.primitives.Chars;
 
 /**
  * Test suite for the {@link BufferedBlockingQueue}
@@ -289,70 +295,148 @@ public class BufferedBlockingQueueTest {
      * Test iterating on the queue while a producer and a consumer threads are
      * using it. The iteration should not affect the elements taken by the
      * consumer.
-     *
-     * @throws InterruptedException
-     *             The test was interrupted
-     * @throws ExecutionException
-     *             If one of the sub-threads throws an exception, which should
-     *             not happen
      */
     @Test
-    public void testConcurrentIteration() throws InterruptedException, ExecutionException {
+    public void testConcurrentIteration() {
         final BufferedBlockingQueue<String> queue = new BufferedBlockingQueue<>(15, 15);
+        final String poisonPill = "That's all folks!";
 
-        ExecutorService pool = Executors.newFixedThreadPool(3);
+        /*
+         * Convert the test's testBuffer into an array of String, one for each
+         * character. There are probably simpler ways of doing this, but it
+         * would not look as impressive.
+         */
+        FluentIterable<Character> fi = FluentIterable.from(Chars.asList(testString.toCharArray()));
+        List<String> strings = fi.transform(Functions.toStringFunction()).toList();
 
-        final String poisonPill = "That's all folks!";
+        Iterable<Iterable<String>> results =
+                runConcurrencyTest(queue, strings, poisonPill, 1, 1, 1);
+
+        assertEquals(strings, Iterables.getOnlyElement(results));
+    }
 
-        Runnable producer = new Runnable() {
+    /**
+     * Run a concurrency test on a {@link BufferedBlockingQueue}, with the
+     * specified number of producer, consumer and observer/iterator threads.
+     *
+     * The returned value represents the elements consumed by each consumer
+     * thread. Thus, if there is one consumer, the top-level {@link Iterable}
+     * will be of size 1, and the inner one should contain all the elements that
+     * were inserted.
+     *
+     * @param queue
+     *            The queue to run the test on
+     * @param testBuffer
+     *            The data set to insert in the queue. Every producer will
+     *            insert one entire set.
+     * @param poisonPill
+     *            The "poison pill" to indicate the end. Simply make sure it is
+     *            a element of type <T> that is not present in the 'testBuffer'.
+     * @param nbProducerThreads
+     *            Number of producer threads. There should be at least 1.
+     * @param nbConsumerThreads
+     *            Number of consumer threads. There should be at least 1.
+     * @param nbObserverThreads
+     *            Number of observer threads. It should be >= 0.
+     * @return The consumed elements, as seen by each consumer thread.
+     */
+    private static <T> Iterable<Iterable<T>> runConcurrencyTest(final BufferedBlockingQueue<T> queue,
+            final List<T> testBuffer,
+            final @NonNull T poisonPill,
+            int nbProducerThreads,
+            int nbConsumerThreads,
+            int nbObserverThreads) {
+
+        final class ProducerThread implements Runnable {
             @Override
             public void run() {
-                for (int i = 0; i < testString.length(); i++) {
-                    queue.put(nullToEmptyString(String.valueOf(testString.charAt(i))));
+                for (int i = 0; i < testBuffer.size(); i++) {
+                    T elem = testBuffer.get(i);
+                    if (elem == null) {
+                        // TODO replace with List<@NonNull T> once we can
+                        throw new IllegalArgumentException();
+                    }
+                    queue.put(elem);
                 }
                 queue.put(poisonPill);
                 queue.flushInputBuffer();
             }
-        };
+        }
 
-        Callable<String> consumer = new Callable<String>() {
+        /**
+         * The consumer thread will return the elements it read via its Future.
+         *
+         * Note that if there are multiple consumers, there is no guarantee with
+         * regards the contents of an individual one.
+         */
+        final class ConsumerThread implements Callable<Iterable<T>> {
             @Override
-            public String call() {
-                StringBuilder sb = new StringBuilder();
-                String s = queue.take();
-                while (!s.equals(poisonPill)) {
-                    sb.append(s);
-                    s = queue.take();
+            public Iterable<T> call() {
+                List<T> results = new LinkedList<>();
+                T elem = queue.take();
+                while (!elem.equals(poisonPill)) {
+                    results.add(elem);
+                    elem = queue.take();
                 }
-                return sb.toString();
+                return results;
             }
-        };
+        }
 
-        Runnable inquisitor = new Runnable() {
+        final class ObserverThread implements Runnable {
             @Override
             public void run() {
-                for (int i = 0; i < 10; i++) {
-                    final Set<String> results = new HashSet<>();
-                    /*
-                     * The interest of this test is here: we are iterating on
-                     * the queue while it is being used.
-                     */
-                    for (String input : queue) {
+                for (int i = 0; i < 5; i++) {
+                    final Set<T> results = new HashSet<>();
+                    for (T input : queue) {
+                        /*
+                         * Do something with the element so that this iteration
+                         * does not get optimized out.
+                         */
                         results.add(input);
                     }
                 }
             }
-        };
+        }
 
-        pool.submit(producer);
-        pool.submit(inquisitor);
-        Future<String> message = pool.submit(consumer);
+        if (nbProducerThreads < 1 || nbConsumerThreads < 1 || nbObserverThreads < 0) {
+            throw new IllegalArgumentException();
+        }
 
-        pool.shutdown();
-        pool.awaitTermination(2, TimeUnit.MINUTES);
+        final ExecutorService pool = Executors.newFixedThreadPool(
+                nbProducerThreads + nbConsumerThreads + nbObserverThreads);
 
-        assertEquals(testString, message.get());
-    }
+        /* Consumed elements, per consumer thread */
+        List<Future<Iterable<T>>> consumedElements = new LinkedList<>();
 
+        for (int i = 0; i < nbProducerThreads; i++) {
+            pool.submit(new ProducerThread());
+        }
+        for (int i = 0; i < nbConsumerThreads; i++) {
+            consumedElements.add(pool.submit(new ConsumerThread()));
+        }
+        for (int i = 0; i < nbObserverThreads; i++) {
+            pool.submit(new ObserverThread());
+        }
+
+        List<Iterable<T>> results = new LinkedList<>();
+        try {
+            /* Convert the Future's to the actual return value */
+            for (Future<Iterable<T>> future : consumedElements) {
+                Iterable<T> threadResult = future.get();
+                results.add(threadResult);
+            }
+
+            pool.shutdown();
+            boolean success = pool.awaitTermination(2, TimeUnit.MINUTES);
+            if (!success) {
+                throw new InterruptedException();
+            }
+
+        } catch (ExecutionException | InterruptedException e) {
+            fail(e.getMessage());
+        }
+
+        return results;
+    }
 
-}
\ No newline at end of file
+}
This page took 0.031452 seconds and 5 git commands to generate.