package org.eclipse.tracecompass.common.core.tests.collect;
-import static org.eclipse.tracecompass.common.core.NonNullUtils.nullToEmptyString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.util.Collection;
import java.util.Deque;
import java.util.HashSet;
import java.util.LinkedList;
+import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.tracecompass.common.core.NonNullUtils;
import org.eclipse.tracecompass.common.core.collect.BufferedBlockingQueue;
import org.junit.Before;
import org.junit.rules.TestRule;
import org.junit.rules.Timeout;
+import com.google.common.base.Functions;
+import com.google.common.collect.FluentIterable;
import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
+import com.google.common.primitives.Chars;
/**
* Test suite for the {@link BufferedBlockingQueue}
* Test iterating on the queue while a producer and a consumer threads are
* using it. The iteration should not affect the elements taken by the
* consumer.
- *
- * @throws InterruptedException
- * The test was interrupted
- * @throws ExecutionException
- * If one of the sub-threads throws an exception, which should
- * not happen
*/
@Test
- public void testConcurrentIteration() throws InterruptedException, ExecutionException {
+ public void testConcurrentIteration() {
final BufferedBlockingQueue<String> queue = new BufferedBlockingQueue<>(15, 15);
+ final String poisonPill = "That's all folks!";
- ExecutorService pool = Executors.newFixedThreadPool(3);
+ /*
+ * Convert the test's testBuffer into an array of String, one for each
+ * character. There are probably simpler ways of doing this, but it
+ * would not look as impressive.
+ */
+ FluentIterable<Character> fi = FluentIterable.from(Chars.asList(testString.toCharArray()));
+ List<String> strings = fi.transform(Functions.toStringFunction()).toList();
- final String poisonPill = "That's all folks!";
+ Iterable<Iterable<String>> results =
+ runConcurrencyTest(queue, strings, poisonPill, 1, 1, 1);
+
+ assertEquals(strings, Iterables.getOnlyElement(results));
+ }
- Runnable producer = new Runnable() {
+ /**
+ * Run a concurrency test on a {@link BufferedBlockingQueue}, with the
+ * specified number of producer, consumer and observer/iterator threads.
+ *
+ * The returned value represents the elements consumed by each consumer
+ * thread. Thus, if there is one consumer, the top-level {@link Iterable}
+ * will be of size 1, and the inner one should contain all the elements that
+ * were inserted.
+ *
+ * @param queue
+ * The queue to run the test on
+ * @param testBuffer
+ * The data set to insert in the queue. Every producer will
+ * insert one entire set.
+ * @param poisonPill
+ * The "poison pill" to indicate the end. Simply make sure it is
+ * a element of type <T> that is not present in the 'testBuffer'.
+ * @param nbProducerThreads
+ * Number of producer threads. There should be at least 1.
+ * @param nbConsumerThreads
+ * Number of consumer threads. There should be at least 1.
+ * @param nbObserverThreads
+ * Number of observer threads. It should be >= 0.
+ * @return The consumed elements, as seen by each consumer thread.
+ */
+ private static <T> Iterable<Iterable<T>> runConcurrencyTest(final BufferedBlockingQueue<T> queue,
+ final List<T> testBuffer,
+ final @NonNull T poisonPill,
+ int nbProducerThreads,
+ int nbConsumerThreads,
+ int nbObserverThreads) {
+
+ final class ProducerThread implements Runnable {
@Override
public void run() {
- for (int i = 0; i < testString.length(); i++) {
- queue.put(nullToEmptyString(String.valueOf(testString.charAt(i))));
+ for (int i = 0; i < testBuffer.size(); i++) {
+ T elem = testBuffer.get(i);
+ if (elem == null) {
+ // TODO replace with List<@NonNull T> once we can
+ throw new IllegalArgumentException();
+ }
+ queue.put(elem);
}
queue.put(poisonPill);
queue.flushInputBuffer();
}
- };
+ }
- Callable<String> consumer = new Callable<String>() {
+ /**
+ * The consumer thread will return the elements it read via its Future.
+ *
+ * Note that if there are multiple consumers, there is no guarantee with
+ * regards the contents of an individual one.
+ */
+ final class ConsumerThread implements Callable<Iterable<T>> {
@Override
- public String call() {
- StringBuilder sb = new StringBuilder();
- String s = queue.take();
- while (!s.equals(poisonPill)) {
- sb.append(s);
- s = queue.take();
+ public Iterable<T> call() {
+ List<T> results = new LinkedList<>();
+ T elem = queue.take();
+ while (!elem.equals(poisonPill)) {
+ results.add(elem);
+ elem = queue.take();
}
- return sb.toString();
+ return results;
}
- };
+ }
- Runnable inquisitor = new Runnable() {
+ final class ObserverThread implements Runnable {
@Override
public void run() {
- for (int i = 0; i < 10; i++) {
- final Set<String> results = new HashSet<>();
- /*
- * The interest of this test is here: we are iterating on
- * the queue while it is being used.
- */
- for (String input : queue) {
+ for (int i = 0; i < 5; i++) {
+ final Set<T> results = new HashSet<>();
+ for (T input : queue) {
+ /*
+ * Do something with the element so that this iteration
+ * does not get optimized out.
+ */
results.add(input);
}
}
}
- };
+ }
- pool.submit(producer);
- pool.submit(inquisitor);
- Future<String> message = pool.submit(consumer);
+ if (nbProducerThreads < 1 || nbConsumerThreads < 1 || nbObserverThreads < 0) {
+ throw new IllegalArgumentException();
+ }
- pool.shutdown();
- pool.awaitTermination(2, TimeUnit.MINUTES);
+ final ExecutorService pool = Executors.newFixedThreadPool(
+ nbProducerThreads + nbConsumerThreads + nbObserverThreads);
- assertEquals(testString, message.get());
- }
+ /* Consumed elements, per consumer thread */
+ List<Future<Iterable<T>>> consumedElements = new LinkedList<>();
+ for (int i = 0; i < nbProducerThreads; i++) {
+ pool.submit(new ProducerThread());
+ }
+ for (int i = 0; i < nbConsumerThreads; i++) {
+ consumedElements.add(pool.submit(new ConsumerThread()));
+ }
+ for (int i = 0; i < nbObserverThreads; i++) {
+ pool.submit(new ObserverThread());
+ }
+
+ List<Iterable<T>> results = new LinkedList<>();
+ try {
+ /* Convert the Future's to the actual return value */
+ for (Future<Iterable<T>> future : consumedElements) {
+ Iterable<T> threadResult = future.get();
+ results.add(threadResult);
+ }
+
+ pool.shutdown();
+ boolean success = pool.awaitTermination(2, TimeUnit.MINUTES);
+ if (!success) {
+ throw new InterruptedException();
+ }
+
+ } catch (ExecutionException | InterruptedException e) {
+ fail(e.getMessage());
+ }
+
+ return results;
+ }
-}
\ No newline at end of file
+}