1 /*******************************************************************************
2 * Copyright (c) 2015 Ericsson
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * Matthew Khouzam - Initial API and implementation
11 *******************************************************************************/
13 package org
.eclipse
.tracecompass
.common
.core
.tests
.collect
;
15 import static org
.junit
.Assert
.assertEquals
;
16 import static org
.junit
.Assert
.assertFalse
;
17 import static org
.junit
.Assert
.assertTrue
;
18 import static org
.junit
.Assert
.fail
;
20 import java
.util
.Collection
;
21 import java
.util
.Deque
;
22 import java
.util
.HashSet
;
23 import java
.util
.LinkedList
;
24 import java
.util
.List
;
25 import java
.util
.Random
;
27 import java
.util
.concurrent
.Callable
;
28 import java
.util
.concurrent
.ExecutionException
;
29 import java
.util
.concurrent
.ExecutorService
;
30 import java
.util
.concurrent
.Executors
;
31 import java
.util
.concurrent
.Future
;
32 import java
.util
.concurrent
.TimeUnit
;
33 import java
.util
.stream
.Collectors
;
35 import org
.eclipse
.jdt
.annotation
.NonNull
;
36 import org
.eclipse
.tracecompass
.common
.core
.collect
.BufferedBlockingQueue
;
37 import org
.junit
.Before
;
38 import org
.junit
.Rule
;
39 import org
.junit
.Test
;
40 import org
.junit
.rules
.TestRule
;
41 import org
.junit
.rules
.Timeout
;
43 import com
.google
.common
.collect
.HashMultiset
;
44 import com
.google
.common
.collect
.Iterables
;
45 import com
.google
.common
.collect
.Iterators
;
46 import com
.google
.common
.primitives
.Chars
;
49 * Test suite for the {@link BufferedBlockingQueue}
51 public class BufferedBlockingQueueTest
{
53 /** Timeout the tests after 2 minutes */
55 public TestRule timeoutRule
= new Timeout(2, TimeUnit
.MINUTES
);
57 private static final String testString
= "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" +
58 "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" +
59 "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz";
61 private BufferedBlockingQueue
<Character
> charQueue
;
68 charQueue
= new BufferedBlockingQueue
<>(15, 15);
72 * Test inserting one element and removing it.
75 public void testSingleInsertion() {
76 Character element
= 'x';
77 charQueue
.put(element
);
78 charQueue
.flushInputBuffer();
80 Character out
= charQueue
.take();
81 assertEquals(element
, out
);
85 * Test insertion of elements that fit into the input buffer.
88 public void testSimpleInsertion() {
89 String string
= "Hello world!";
90 for (char elem
: string
.toCharArray()) {
93 charQueue
.flushInputBuffer();
95 StringBuilder sb
= new StringBuilder();
96 while (!charQueue
.isEmpty()) {
97 sb
.append(charQueue
.take());
99 assertEquals(string
, sb
.toString());
103 * Test insertion of elements that will require more than one input buffer.
106 public void testLargeInsertion() {
107 String string
= testString
.substring(0, 222);
108 for (char elem
: string
.toCharArray()) {
111 charQueue
.flushInputBuffer();
113 StringBuilder sb
= new StringBuilder();
114 while (!charQueue
.isEmpty()) {
115 sb
.append(charQueue
.take());
117 assertEquals(string
, sb
.toString());
121 * Test the state of the {@link BufferedBlockingQueue#isEmpty()} method at
125 public void testIsEmpty() {
126 BufferedBlockingQueue
<String
> stringQueue
= new BufferedBlockingQueue
<>(15, 15);
127 assertTrue(stringQueue
.isEmpty());
129 stringQueue
.put("Hello");
130 assertFalse(stringQueue
.isEmpty());
132 stringQueue
.flushInputBuffer();
133 assertFalse(stringQueue
.isEmpty());
135 stringQueue
.flushInputBuffer();
136 assertFalse(stringQueue
.isEmpty());
138 stringQueue
.flushInputBuffer();
140 assertTrue(stringQueue
.isEmpty());
142 stringQueue
.flushInputBuffer();
143 assertTrue(stringQueue
.isEmpty());
147 * Write random data in and read it, several times.
150 public void testOddInsertions() {
151 BufferedBlockingQueue
<Object
> objectQueue
= new BufferedBlockingQueue
<>(15, 15);
152 LinkedList
<Object
> expectedValues
= new LinkedList
<>();
153 Random rnd
= new Random();
156 for (int i
= 0; i
< 10; i
++) {
158 * The queue's total size is 225 (15x15). We must make sure to not
161 for (int j
= 0; j
< 50; j
++) {
162 Integer testInt
= rnd
.nextInt();
163 Long testLong
= rnd
.nextLong();
164 Double testDouble
= rnd
.nextDouble();
165 Double testGaussian
= rnd
.nextGaussian();
167 expectedValues
.add(testInt
);
168 expectedValues
.add(testLong
);
169 expectedValues
.add(testDouble
);
170 expectedValues
.add(testGaussian
);
171 objectQueue
.put(testInt
);
172 objectQueue
.put(testLong
);
173 objectQueue
.put(testDouble
);
174 objectQueue
.put(testGaussian
);
176 objectQueue
.flushInputBuffer();
178 while (!expectedValues
.isEmpty()) {
179 Object expected
= expectedValues
.removeFirst();
180 Object actual
= objectQueue
.take();
181 assertEquals(expected
, actual
);
187 * Read with a producer and a consumer
189 * @throws InterruptedException
190 * The test was interrupted
193 public void testMultiThread() throws InterruptedException
{
194 /* A character not found in the test string */
195 final Character lastElement
= '%';
197 Thread producer
= new Thread() {
200 for (char c
: testString
.toCharArray()) {
203 charQueue
.put(lastElement
);
204 charQueue
.flushInputBuffer();
209 Thread consumer
= new Thread() {
212 Character s
= charQueue
.take();
213 while (!s
.equals(lastElement
)) {
214 s
= charQueue
.take();
225 * Read with a producer and a consumer using
226 * {@link BufferedBlockingQueue#blockingPeek()}.
228 * @throws InterruptedException
229 * The test was interrupted
232 public void testBlockingPeek() throws InterruptedException
{
233 /* A character not found in the test string */
234 final Character lastElement
= '%';
236 final StringBuilder sb
= new StringBuilder();
238 Thread consumer
= new Thread() {
241 boolean isFinished
= false;
242 while (!isFinished
) {
243 // Read last element without removing it
244 Character s
= charQueue
.blockingPeek();
245 isFinished
= s
.equals(lastElement
);
256 Thread producer
= new Thread() {
259 for (char c
: testString
.toCharArray()) {
262 charQueue
.put(lastElement
);
263 charQueue
.flushInputBuffer();
271 assertEquals(testString
, sb
.toString());
275 * Test the contents returned by {@link BufferedBlockingQueue#iterator()}.
277 * The test is sequential, because the iterator has no guarantee wrt to its
278 * contents when run concurrently.
281 public void testIteratorContents() {
282 Deque
<Character
> expected
= new LinkedList
<>();
284 /* Iterator should be empty initially */
285 assertFalse(charQueue
.iterator().hasNext());
287 /* Insert the first 50 elements */
288 for (int i
= 0; i
< 50; i
++) {
289 char c
= testString
.charAt(i
);
291 expected
.addFirst(c
);
293 LinkedList
<Character
> actual
= new LinkedList
<>();
294 Iterators
.addAll(actual
, charQueue
.iterator());
295 assertSameElements(expected
, actual
);
298 * Insert more elements, flush the input buffer (should not affect the
301 for (int i
= 50; i
< 60; i
++) {
302 char c
= testString
.charAt(i
);
304 charQueue
.flushInputBuffer();
305 expected
.addFirst(c
);
307 actual
= new LinkedList
<>();
308 Iterators
.addAll(actual
, charQueue
.iterator());
309 assertSameElements(expected
, actual
);
311 /* Consume the 30 last elements from the queue */
312 for (int i
= 0; i
< 30; i
++) {
314 expected
.removeLast();
316 actual
= new LinkedList
<>();
317 Iterators
.addAll(actual
, charQueue
.iterator());
318 assertSameElements(expected
, actual
);
320 /* Now empty the queue */
321 while (!charQueue
.isEmpty()) {
323 expected
.removeLast();
325 assertFalse(charQueue
.iterator().hasNext());
329 * Utility method to verify that two collections contain the exact same
330 * elements, not necessarily in the same iteration order.
332 * {@link Collection#equals} requires the iteration order to be the same,
333 * which we do not want here.
335 * Using a {@link Set} or {@link Collection#containsAll} is not sufficient
336 * either, because those will throw away duplicate elements.
338 private static <T
> void assertSameElements(Collection
<T
> c1
, Collection
<T
> c2
) {
339 assertEquals(HashMultiset
.create(c1
), HashMultiset
.create(c2
));
343 * Test iterating on the queue while a producer and a consumer threads are
344 * using it. The iteration should not affect the elements taken by the
348 public void testConcurrentIteration() {
349 final BufferedBlockingQueue
<@NonNull String
> queue
= new BufferedBlockingQueue
<>(15, 15);
350 final String poisonPill
= "That's all folks!";
353 * Convert the test's testBuffer into an array of String, one for each
356 List
<String
> strings
= Chars
.asList(testString
.toCharArray()).stream()
357 .map(Object
::toString
)
358 .collect(Collectors
.toList());
360 Iterable
<Iterable
<String
>> results
=
361 runConcurrencyTest(queue
, strings
, poisonPill
, 1, 1, 1);
363 assertEquals(strings
, Iterables
.getOnlyElement(results
));
367 * Run a concurrency test on a {@link BufferedBlockingQueue}, with the
368 * specified number of producer, consumer and observer/iterator threads.
370 * The returned value represents the elements consumed by each consumer
371 * thread. Thus, if there is one consumer, the top-level {@link Iterable}
372 * will be of size 1, and the inner one should contain all the elements that
376 * The queue to run the test on
378 * The data set to insert in the queue. Every producer will
379 * insert one entire set.
381 * The "poison pill" to indicate the end. Simply make sure it is
382 * a element of type <T> that is not present in the 'testBuffer'.
383 * @param nbProducerThreads
384 * Number of producer threads. There should be at least 1.
385 * @param nbConsumerThreads
386 * Number of consumer threads. There should be at least 1.
387 * @param nbObserverThreads
388 * Number of observer threads. It should be >= 0.
389 * @return The consumed elements, as seen by each consumer thread.
391 private static <T
> Iterable
<Iterable
<T
>> runConcurrencyTest(final BufferedBlockingQueue
<@NonNull T
> queue
,
392 final List
<T
> testBuffer
,
393 final @NonNull T poisonPill
,
394 int nbProducerThreads
,
395 int nbConsumerThreads
,
396 int nbObserverThreads
) {
398 final class ProducerThread
implements Runnable
{
401 for (int i
= 0; i
< testBuffer
.size(); i
++) {
402 T elem
= testBuffer
.get(i
);
404 // TODO replace with List<@NonNull T> once we can
405 throw new IllegalArgumentException();
409 queue
.put(poisonPill
);
410 queue
.flushInputBuffer();
415 * The consumer thread will return the elements it read via its Future.
417 * Note that if there are multiple consumers, there is no guarantee with
418 * regards the contents of an individual one.
420 final class ConsumerThread
implements Callable
<Iterable
<T
>> {
422 public Iterable
<T
> call() {
423 List
<T
> results
= new LinkedList
<>();
424 T elem
= queue
.take();
425 while (!elem
.equals(poisonPill
)) {
433 final class ObserverThread
implements Runnable
{
436 for (int i
= 0; i
< 5; i
++) {
437 final Set
<T
> results
= new HashSet
<>();
438 for (T input
: queue
) {
440 * Do something with the element so that this iteration
441 * does not get optimized out.
449 if (nbProducerThreads
< 1 || nbConsumerThreads
< 1 || nbObserverThreads
< 0) {
450 throw new IllegalArgumentException();
453 final ExecutorService pool
= Executors
.newFixedThreadPool(
454 nbProducerThreads
+ nbConsumerThreads
+ nbObserverThreads
);
456 /* Consumed elements, per consumer thread */
457 List
<Future
<Iterable
<T
>>> consumedElements
= new LinkedList
<>();
459 for (int i
= 0; i
< nbProducerThreads
; i
++) {
460 pool
.submit(new ProducerThread());
462 for (int i
= 0; i
< nbConsumerThreads
; i
++) {
463 consumedElements
.add(pool
.submit(new ConsumerThread()));
465 for (int i
= 0; i
< nbObserverThreads
; i
++) {
466 pool
.submit(new ObserverThread());
469 List
<Iterable
<T
>> results
= new LinkedList
<>();
471 /* Convert the Future's to the actual return value */
472 for (Future
<Iterable
<T
>> future
: consumedElements
) {
473 Iterable
<T
> threadResult
= future
.get();
474 results
.add(threadResult
);
478 boolean success
= pool
.awaitTermination(2, TimeUnit
.MINUTES
);
480 throw new InterruptedException();
483 } catch (ExecutionException
| InterruptedException e
) {
484 fail(e
.getMessage());