1 /*******************************************************************************
2 * Copyright (c) 2015 Ericsson
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * Matthew Khouzam - Initial API and implementation
11 *******************************************************************************/
13 package org
.eclipse
.tracecompass
.common
.core
.tests
.collect
;
15 import static org
.junit
.Assert
.assertEquals
;
16 import static org
.junit
.Assert
.assertFalse
;
17 import static org
.junit
.Assert
.assertTrue
;
18 import static org
.junit
.Assert
.assertNotNull
;
19 import static org
.junit
.Assert
.fail
;
21 import java
.util
.Collection
;
22 import java
.util
.Deque
;
23 import java
.util
.HashSet
;
24 import java
.util
.LinkedList
;
25 import java
.util
.List
;
26 import java
.util
.Random
;
28 import java
.util
.concurrent
.Callable
;
29 import java
.util
.concurrent
.ExecutionException
;
30 import java
.util
.concurrent
.ExecutorService
;
31 import java
.util
.concurrent
.Executors
;
32 import java
.util
.concurrent
.Future
;
33 import java
.util
.concurrent
.TimeUnit
;
34 import java
.util
.stream
.Collectors
;
36 import org
.eclipse
.jdt
.annotation
.NonNull
;
37 import org
.eclipse
.tracecompass
.common
.core
.collect
.BufferedBlockingQueue
;
38 import org
.junit
.Before
;
39 import org
.junit
.Rule
;
40 import org
.junit
.Test
;
41 import org
.junit
.rules
.TestRule
;
42 import org
.junit
.rules
.Timeout
;
44 import com
.google
.common
.collect
.HashMultiset
;
45 import com
.google
.common
.collect
.Iterables
;
46 import com
.google
.common
.collect
.Iterators
;
47 import com
.google
.common
.primitives
.Chars
;
50 * Test suite for the {@link BufferedBlockingQueue}
52 public class BufferedBlockingQueueTest
{
54 /** Timeout the tests after 2 minutes */
56 public TestRule timeoutRule
= new Timeout(2, TimeUnit
.MINUTES
);
58 private static final String testString
= "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" +
59 "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" +
60 "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz";
62 private BufferedBlockingQueue
<Character
> charQueue
;
69 charQueue
= new BufferedBlockingQueue
<>(15, 15);
73 * Test with chunkSize = 1 and buffer queueSize = 1
75 public void testValidConstructor1() {
76 assertNotNull(new BufferedBlockingQueue
<>(1, 1));
80 * Test with chunkSize = 0 and buffer queueSize = 0
82 * Should fail with an {@link IllegalArgumentException}
84 @Test(expected
= IllegalArgumentException
.class)
85 public void testInvalidConstructor1() {
86 assertNotNull(new BufferedBlockingQueue
<>(0, 0));
90 * Test with chunkSize = 0 and buffer queueSize = 1
92 * Should fail with an {@link IllegalArgumentException}
94 @Test(expected
= IllegalArgumentException
.class)
95 public void testInvalidConstructor2() {
96 assertNotNull(new BufferedBlockingQueue
<>(1, 0));
100 * Test with chunkSize = 1 and buffer queueSize = 0
102 public void testInvalidConstructor3() {
103 assertNotNull(new BufferedBlockingQueue
<>(0, 1));
107 * Test with chunkSize = 1 and buffer queueSize =-1
109 * Should fail with an {@link IllegalArgumentException}
111 @Test(expected
= IllegalArgumentException
.class)
112 public void testInvalidConstructor4() {
113 assertNotNull(new BufferedBlockingQueue
<>(-1, 1));
117 * Test with chunkSize = -1 and buffer queueSize = 1
119 * Should fail with an {@link IllegalArgumentException}
121 @Test(expected
= IllegalArgumentException
.class)
122 public void testInvalidConstructor5() {
123 assertNotNull(new BufferedBlockingQueue
<>(1, -1));
127 * Test inserting one element and removing it.
130 public void testSingleInsertion() {
131 Character element
= 'x';
132 charQueue
.put(element
);
133 charQueue
.flushInputBuffer();
135 Character out
= charQueue
.take();
136 assertEquals(element
, out
);
140 * Test insertion of elements that fit into the input buffer.
143 public void testSimpleInsertion() {
144 String string
= "Hello world!";
145 for (char elem
: string
.toCharArray()) {
148 charQueue
.flushInputBuffer();
150 StringBuilder sb
= new StringBuilder();
151 while (!charQueue
.isEmpty()) {
152 sb
.append(charQueue
.take());
154 assertEquals(string
, sb
.toString());
158 * Test insertion of elements that will require more than one input buffer.
161 public void testLargeInsertion() {
162 String string
= testString
.substring(0, 222);
163 for (char elem
: string
.toCharArray()) {
166 charQueue
.flushInputBuffer();
168 StringBuilder sb
= new StringBuilder();
169 while (!charQueue
.isEmpty()) {
170 sb
.append(charQueue
.take());
172 assertEquals(string
, sb
.toString());
176 * Test the state of the {@link BufferedBlockingQueue#isEmpty()} method at
180 public void testIsEmpty() {
181 BufferedBlockingQueue
<String
> stringQueue
= new BufferedBlockingQueue
<>(15, 15);
182 assertTrue(stringQueue
.isEmpty());
184 stringQueue
.put("Hello");
185 assertFalse(stringQueue
.isEmpty());
187 stringQueue
.flushInputBuffer();
188 assertFalse(stringQueue
.isEmpty());
190 stringQueue
.flushInputBuffer();
191 assertFalse(stringQueue
.isEmpty());
193 stringQueue
.flushInputBuffer();
195 assertTrue(stringQueue
.isEmpty());
197 stringQueue
.flushInputBuffer();
198 assertTrue(stringQueue
.isEmpty());
202 * Write random data in and read it, several times.
205 public void testOddInsertions() {
206 BufferedBlockingQueue
<Object
> objectQueue
= new BufferedBlockingQueue
<>(15, 15);
207 LinkedList
<Object
> expectedValues
= new LinkedList
<>();
208 Random rnd
= new Random();
211 for (int i
= 0; i
< 10; i
++) {
213 * The queue's total size is 225 (15x15). We must make sure to not
216 for (int j
= 0; j
< 50; j
++) {
217 Integer testInt
= rnd
.nextInt();
218 Long testLong
= rnd
.nextLong();
219 Double testDouble
= rnd
.nextDouble();
220 Double testGaussian
= rnd
.nextGaussian();
222 expectedValues
.add(testInt
);
223 expectedValues
.add(testLong
);
224 expectedValues
.add(testDouble
);
225 expectedValues
.add(testGaussian
);
226 objectQueue
.put(testInt
);
227 objectQueue
.put(testLong
);
228 objectQueue
.put(testDouble
);
229 objectQueue
.put(testGaussian
);
231 objectQueue
.flushInputBuffer();
233 while (!expectedValues
.isEmpty()) {
234 Object expected
= expectedValues
.removeFirst();
235 Object actual
= objectQueue
.take();
236 assertEquals(expected
, actual
);
242 * Read with a producer and a consumer
244 * @throws InterruptedException
245 * The test was interrupted
248 public void testMultiThread() throws InterruptedException
{
249 /* A character not found in the test string */
250 final Character lastElement
= '%';
252 Thread producer
= new Thread() {
255 for (char c
: testString
.toCharArray()) {
258 charQueue
.put(lastElement
);
259 charQueue
.flushInputBuffer();
264 Thread consumer
= new Thread() {
267 Character s
= charQueue
.take();
268 while (!s
.equals(lastElement
)) {
269 s
= charQueue
.take();
280 * Read with a producer and a consumer using
281 * {@link BufferedBlockingQueue#blockingPeek()}.
283 * @throws InterruptedException
284 * The test was interrupted
287 public void testBlockingPeek() throws InterruptedException
{
288 /* A character not found in the test string */
289 final Character lastElement
= '%';
291 final StringBuilder sb
= new StringBuilder();
293 Thread consumer
= new Thread() {
296 boolean isFinished
= false;
297 while (!isFinished
) {
298 // Read last element without removing it
299 Character s
= charQueue
.blockingPeek();
300 isFinished
= s
.equals(lastElement
);
311 Thread producer
= new Thread() {
314 for (char c
: testString
.toCharArray()) {
317 charQueue
.put(lastElement
);
318 charQueue
.flushInputBuffer();
326 assertEquals(testString
, sb
.toString());
330 * Test the contents returned by {@link BufferedBlockingQueue#iterator()}.
332 * The test is sequential, because the iterator has no guarantee wrt to its
333 * contents when run concurrently.
336 public void testIteratorContents() {
337 Deque
<Character
> expected
= new LinkedList
<>();
339 /* Iterator should be empty initially */
340 assertFalse(charQueue
.iterator().hasNext());
342 /* Insert the first 50 elements */
343 for (int i
= 0; i
< 50; i
++) {
344 char c
= testString
.charAt(i
);
346 expected
.addFirst(c
);
348 LinkedList
<Character
> actual
= new LinkedList
<>();
349 Iterators
.addAll(actual
, charQueue
.iterator());
350 assertSameElements(expected
, actual
);
353 * Insert more elements, flush the input buffer (should not affect the
356 for (int i
= 50; i
< 60; i
++) {
357 char c
= testString
.charAt(i
);
359 charQueue
.flushInputBuffer();
360 expected
.addFirst(c
);
362 actual
= new LinkedList
<>();
363 Iterators
.addAll(actual
, charQueue
.iterator());
364 assertSameElements(expected
, actual
);
366 /* Consume the 30 last elements from the queue */
367 for (int i
= 0; i
< 30; i
++) {
369 expected
.removeLast();
371 actual
= new LinkedList
<>();
372 Iterators
.addAll(actual
, charQueue
.iterator());
373 assertSameElements(expected
, actual
);
375 /* Now empty the queue */
376 while (!charQueue
.isEmpty()) {
378 expected
.removeLast();
380 assertFalse(charQueue
.iterator().hasNext());
384 * Utility method to verify that two collections contain the exact same
385 * elements, not necessarily in the same iteration order.
387 * {@link Collection#equals} requires the iteration order to be the same,
388 * which we do not want here.
390 * Using a {@link Set} or {@link Collection#containsAll} is not sufficient
391 * either, because those will throw away duplicate elements.
393 private static <T
> void assertSameElements(Collection
<T
> c1
, Collection
<T
> c2
) {
394 assertEquals(HashMultiset
.create(c1
), HashMultiset
.create(c2
));
398 * Test iterating on the queue while a producer and a consumer threads are
399 * using it. The iteration should not affect the elements taken by the
403 public void testConcurrentIteration() {
404 final BufferedBlockingQueue
<@NonNull String
> queue
= new BufferedBlockingQueue
<>(15, 15);
405 final String poisonPill
= "That's all folks!";
408 * Convert the test's testBuffer into an array of String, one for each
411 List
<String
> strings
= Chars
.asList(testString
.toCharArray()).stream()
412 .map(Object
::toString
)
413 .collect(Collectors
.toList());
415 Iterable
<Iterable
<String
>> results
=
416 runConcurrencyTest(queue
, strings
, poisonPill
, 1, 1, 1);
418 assertEquals(strings
, Iterables
.getOnlyElement(results
));
422 * Run a concurrency test on a {@link BufferedBlockingQueue}, with the
423 * specified number of producer, consumer and observer/iterator threads.
425 * The returned value represents the elements consumed by each consumer
426 * thread. Thus, if there is one consumer, the top-level {@link Iterable}
427 * will be of size 1, and the inner one should contain all the elements that
431 * The queue to run the test on
433 * The data set to insert in the queue. Every producer will
434 * insert one entire set.
436 * The "poison pill" to indicate the end. Simply make sure it is
437 * a element of type <T> that is not present in the 'testBuffer'.
438 * @param nbProducerThreads
439 * Number of producer threads. There should be at least 1.
440 * @param nbConsumerThreads
441 * Number of consumer threads. There should be at least 1.
442 * @param nbObserverThreads
443 * Number of observer threads. It should be >= 0.
444 * @return The consumed elements, as seen by each consumer thread.
446 private static <T
> Iterable
<Iterable
<T
>> runConcurrencyTest(final BufferedBlockingQueue
<@NonNull T
> queue
,
447 final List
<T
> testBuffer
,
448 final @NonNull T poisonPill
,
449 int nbProducerThreads
,
450 int nbConsumerThreads
,
451 int nbObserverThreads
) {
453 final class ProducerThread
implements Runnable
{
456 for (int i
= 0; i
< testBuffer
.size(); i
++) {
457 T elem
= testBuffer
.get(i
);
459 // TODO replace with List<@NonNull T> once we can
460 throw new IllegalArgumentException();
464 queue
.put(poisonPill
);
465 queue
.flushInputBuffer();
470 * The consumer thread will return the elements it read via its Future.
472 * Note that if there are multiple consumers, there is no guarantee with
473 * regards the contents of an individual one.
475 final class ConsumerThread
implements Callable
<Iterable
<T
>> {
477 public Iterable
<T
> call() {
478 List
<T
> results
= new LinkedList
<>();
479 T elem
= queue
.take();
480 while (!elem
.equals(poisonPill
)) {
488 final class ObserverThread
implements Runnable
{
491 for (int i
= 0; i
< 5; i
++) {
492 final Set
<T
> results
= new HashSet
<>();
493 for (T input
: queue
) {
495 * Do something with the element so that this iteration
496 * does not get optimized out.
504 if (nbProducerThreads
< 1 || nbConsumerThreads
< 1 || nbObserverThreads
< 0) {
505 throw new IllegalArgumentException();
508 final ExecutorService pool
= Executors
.newFixedThreadPool(
509 nbProducerThreads
+ nbConsumerThreads
+ nbObserverThreads
);
511 /* Consumed elements, per consumer thread */
512 List
<Future
<Iterable
<T
>>> consumedElements
= new LinkedList
<>();
514 for (int i
= 0; i
< nbProducerThreads
; i
++) {
515 pool
.submit(new ProducerThread());
517 for (int i
= 0; i
< nbConsumerThreads
; i
++) {
518 consumedElements
.add(pool
.submit(new ConsumerThread()));
520 for (int i
= 0; i
< nbObserverThreads
; i
++) {
521 pool
.submit(new ObserverThread());
524 List
<Iterable
<T
>> results
= new LinkedList
<>();
526 /* Convert the Future's to the actual return value */
527 for (Future
<Iterable
<T
>> future
: consumedElements
) {
528 Iterable
<T
> threadResult
= future
.get();
529 results
.add(threadResult
);
533 boolean success
= pool
.awaitTermination(2, TimeUnit
.MINUTES
);
535 throw new InterruptedException();
538 } catch (ExecutionException
| InterruptedException e
) {
539 fail(e
.getMessage());