Replace instances of FluentIterable with Java 8 Streams
[deliverable/tracecompass.git] / common / org.eclipse.tracecompass.common.core.tests / src / org / eclipse / tracecompass / common / core / tests / collect / BufferedBlockingQueueTest.java
CommitLineData
9d979fda
MK
1/*******************************************************************************
2 * Copyright (c) 2015 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Matthew Khouzam - Initial API and implementation
11 *******************************************************************************/
12
13package org.eclipse.tracecompass.common.core.tests.collect;
14
9d979fda
MK
15import static org.junit.Assert.assertEquals;
16import static org.junit.Assert.assertFalse;
17import static org.junit.Assert.assertTrue;
fadd7888 18import static org.junit.Assert.fail;
9d979fda 19
47c79d9f
AM
20import java.util.Collection;
21import java.util.Deque;
d6e2666b 22import java.util.HashSet;
9d979fda 23import java.util.LinkedList;
fadd7888 24import java.util.List;
9d979fda 25import java.util.Random;
d6e2666b 26import java.util.Set;
9d979fda
MK
27import java.util.concurrent.Callable;
28import java.util.concurrent.ExecutionException;
29import java.util.concurrent.ExecutorService;
30import java.util.concurrent.Executors;
31import java.util.concurrent.Future;
32import java.util.concurrent.TimeUnit;
bbadfd0a 33import java.util.stream.Collectors;
9d979fda 34
fadd7888 35import org.eclipse.jdt.annotation.NonNull;
9d979fda
MK
36import org.eclipse.tracecompass.common.core.NonNullUtils;
37import org.eclipse.tracecompass.common.core.collect.BufferedBlockingQueue;
38import org.junit.Before;
39import org.junit.Rule;
40import org.junit.Test;
41import org.junit.rules.TestRule;
42import org.junit.rules.Timeout;
43
47c79d9f 44import com.google.common.collect.HashMultiset;
fadd7888 45import com.google.common.collect.Iterables;
47c79d9f 46import com.google.common.collect.Iterators;
fadd7888 47import com.google.common.primitives.Chars;
47c79d9f 48
9d979fda
MK
49/**
50 * Test suite for the {@link BufferedBlockingQueue}
51 */
52public class BufferedBlockingQueueTest {
53
54 /** Timeout the tests after 2 minutes */
55 @Rule
d291a715 56 public TestRule timeoutRule = new Timeout(2, TimeUnit.MINUTES);
9d979fda 57
47c79d9f
AM
58 private static final String testString = "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" +
59 "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" +
60 "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz";
61
9d979fda
MK
62 private BufferedBlockingQueue<Character> charQueue;
63
64 /**
65 * Test setup
66 */
67 @Before
68 public void init() {
69 charQueue = new BufferedBlockingQueue<>(15, 15);
70 }
71
72 /**
73 * Test inserting one element and removing it.
74 */
75 @Test
76 public void testSingleInsertion() {
77 Character element = 'x';
78 charQueue.put(element);
79 charQueue.flushInputBuffer();
80
81 Character out = charQueue.take();
82 assertEquals(element, out);
83 }
84
85 /**
86 * Test insertion of elements that fit into the input buffer.
87 */
88 @Test
89 public void testSimpleInsertion() {
90 String string = "Hello world!";
91 for (char elem : string.toCharArray()) {
92 charQueue.put(elem);
93 }
94 charQueue.flushInputBuffer();
95
96 StringBuilder sb = new StringBuilder();
97 while (!charQueue.isEmpty()) {
98 sb.append(charQueue.take());
99 }
100 assertEquals(string, sb.toString());
101 }
102
103 /**
104 * Test insertion of elements that will require more than one input buffer.
105 */
106 @Test
107 public void testLargeInsertion() {
108 String string = testString.substring(0, 222);
109 for (char elem : string.toCharArray()) {
110 charQueue.put(elem);
111 }
112 charQueue.flushInputBuffer();
113
114 StringBuilder sb = new StringBuilder();
115 while (!charQueue.isEmpty()) {
116 sb.append(charQueue.take());
117 }
118 assertEquals(string, sb.toString());
119 }
120
121 /**
122 * Test the state of the {@link BufferedBlockingQueue#isEmpty()} method at
123 * various moments.
124 */
125 @Test
126 public void testIsEmpty() {
127 BufferedBlockingQueue<String> stringQueue = new BufferedBlockingQueue<>(15, 15);
128 assertTrue(stringQueue.isEmpty());
129
130 stringQueue.put("Hello");
131 assertFalse(stringQueue.isEmpty());
132
133 stringQueue.flushInputBuffer();
134 assertFalse(stringQueue.isEmpty());
135
136 stringQueue.flushInputBuffer();
137 assertFalse(stringQueue.isEmpty());
138
139 stringQueue.flushInputBuffer();
140 stringQueue.take();
141 assertTrue(stringQueue.isEmpty());
142
143 stringQueue.flushInputBuffer();
144 assertTrue(stringQueue.isEmpty());
145 }
146
147 /**
148 * Write random data in and read it, several times.
149 */
150 @Test
151 public void testOddInsertions() {
152 BufferedBlockingQueue<Object> objectQueue = new BufferedBlockingQueue<>(15, 15);
153 LinkedList<Object> expectedValues = new LinkedList<>();
154 Random rnd = new Random();
155 rnd.setSeed(123);
156
157 for (int i = 0; i < 10; i++) {
158 /*
159 * The queue's total size is 225 (15x15). We must make sure to not
160 * fill it up here!
161 */
162 for (int j = 0; j < 50; j++) {
163 Integer testInt = NonNullUtils.checkNotNull(rnd.nextInt());
164 Long testLong = NonNullUtils.checkNotNull(rnd.nextLong());
165 Double testDouble = NonNullUtils.checkNotNull(rnd.nextDouble());
166 Double testGaussian = NonNullUtils.checkNotNull(rnd.nextGaussian());
167
168 expectedValues.add(testInt);
169 expectedValues.add(testLong);
170 expectedValues.add(testDouble);
171 expectedValues.add(testGaussian);
172 objectQueue.put(testInt);
173 objectQueue.put(testLong);
174 objectQueue.put(testDouble);
175 objectQueue.put(testGaussian);
176 }
177 objectQueue.flushInputBuffer();
178
179 while (!expectedValues.isEmpty()) {
180 Object expected = expectedValues.removeFirst();
181 Object actual = objectQueue.take();
182 assertEquals(expected, actual);
183 }
184 }
185 }
186
187 /**
188 * Read with a producer and a consumer
189 *
190 * @throws InterruptedException
191 * The test was interrupted
192 */
193 @Test
194 public void testMultiThread() throws InterruptedException {
195 /* A character not found in the test string */
196 final Character lastElement = '%';
197
198 Thread producer = new Thread() {
199 @Override
200 public void run() {
201 for (char c : testString.toCharArray()) {
202 charQueue.put(c);
203 }
204 charQueue.put(lastElement);
205 charQueue.flushInputBuffer();
206 }
207 };
208 producer.start();
209
210 Thread consumer = new Thread() {
211 @Override
212 public void run() {
213 Character s = charQueue.take();
214 while (!s.equals(lastElement)) {
215 s = charQueue.take();
216 }
217 }
218 };
219 consumer.start();
220
221 consumer.join();
222 producer.join();
223 }
224
49698f83
BH
225 /**
226 * Read with a producer and a consumer using
227 * {@link BufferedBlockingQueue#blockingPeek()}.
228 *
229 * @throws InterruptedException
230 * The test was interrupted
231 */
232 @Test
233 public void testBlockingPeek() throws InterruptedException {
234 /* A character not found in the test string */
235 final Character lastElement = '%';
236
237 final StringBuilder sb = new StringBuilder();
238
239 Thread consumer = new Thread() {
240 @Override
241 public void run() {
242 boolean isFinished = false;
243 while (!isFinished) {
244 // Read last element without removing it
245 Character s = charQueue.blockingPeek();
246 isFinished = s.equals(lastElement);
247 if (!isFinished) {
248 sb.append(s);
249 }
250 // Remove element
251 charQueue.take();
252 }
253 }
254 };
255 consumer.start();
256
257 Thread producer = new Thread() {
258 @Override
259 public void run() {
260 for (char c : testString.toCharArray()) {
261 charQueue.put(c);
262 }
263 charQueue.put(lastElement);
264 charQueue.flushInputBuffer();
265 }
266 };
267 producer.start();
268
269 producer.join();
270 consumer.join();
271
272 assertEquals(testString, sb.toString());
273 }
274
47c79d9f
AM
275 /**
276 * Test the contents returned by {@link BufferedBlockingQueue#iterator()}.
277 *
278 * The test is sequential, because the iterator has no guarantee wrt to its
279 * contents when run concurrently.
280 */
281 @Test
282 public void testIteratorContents() {
283 Deque<Character> expected = new LinkedList<>();
284
285 /* Iterator should be empty initially */
286 assertFalse(charQueue.iterator().hasNext());
287
288 /* Insert the first 50 elements */
289 for (int i = 0; i < 50; i++) {
290 char c = testString.charAt(i);
291 charQueue.put(c);
292 expected.addFirst(c);
293 }
294 LinkedList<Character> actual = new LinkedList<>();
295 Iterators.addAll(actual, charQueue.iterator());
296 assertSameElements(expected, actual);
297
298 /*
299 * Insert more elements, flush the input buffer (should not affect the
300 * iteration).
301 */
302 for (int i = 50; i < 60; i++) {
303 char c = testString.charAt(i);
304 charQueue.put(c);
305 charQueue.flushInputBuffer();
306 expected.addFirst(c);
307 }
308 actual = new LinkedList<>();
309 Iterators.addAll(actual, charQueue.iterator());
310 assertSameElements(expected, actual);
311
312 /* Consume the 30 last elements from the queue */
313 for (int i = 0; i < 30; i++) {
314 charQueue.take();
315 expected.removeLast();
316 }
317 actual = new LinkedList<>();
318 Iterators.addAll(actual, charQueue.iterator());
319 assertSameElements(expected, actual);
320
321 /* Now empty the queue */
322 while (!charQueue.isEmpty()) {
323 charQueue.take();
324 expected.removeLast();
325 }
326 assertFalse(charQueue.iterator().hasNext());
327 }
328
329 /**
330 * Utility method to verify that two collections contain the exact same
331 * elements, not necessarily in the same iteration order.
332 *
333 * {@link Collection#equals} requires the iteration order to be the same,
334 * which we do not want here.
335 *
336 * Using a {@link Set} or {@link Collection#containsAll} is not sufficient
337 * either, because those will throw away duplicate elements.
338 */
339 private static <T> void assertSameElements(Collection<T> c1, Collection<T> c2) {
340 assertEquals(HashMultiset.create(c1), HashMultiset.create(c2));
341 }
342
9d979fda 343 /**
d6e2666b
AM
344 * Test iterating on the queue while a producer and a consumer threads are
345 * using it. The iteration should not affect the elements taken by the
346 * consumer.
9d979fda
MK
347 */
348 @Test
fadd7888 349 public void testConcurrentIteration() {
d6e2666b 350 final BufferedBlockingQueue<String> queue = new BufferedBlockingQueue<>(15, 15);
fadd7888 351 final String poisonPill = "That's all folks!";
9d979fda 352
fadd7888
AM
353 /*
354 * Convert the test's testBuffer into an array of String, one for each
bbadfd0a 355 * character.
fadd7888 356 */
bbadfd0a
AM
357 List<String> strings = Chars.asList(testString.toCharArray()).stream()
358 .map(Object::toString)
359 .collect(Collectors.toList());
9d979fda 360
fadd7888
AM
361 Iterable<Iterable<String>> results =
362 runConcurrencyTest(queue, strings, poisonPill, 1, 1, 1);
363
364 assertEquals(strings, Iterables.getOnlyElement(results));
365 }
9d979fda 366
fadd7888
AM
367 /**
368 * Run a concurrency test on a {@link BufferedBlockingQueue}, with the
369 * specified number of producer, consumer and observer/iterator threads.
370 *
371 * The returned value represents the elements consumed by each consumer
372 * thread. Thus, if there is one consumer, the top-level {@link Iterable}
373 * will be of size 1, and the inner one should contain all the elements that
374 * were inserted.
375 *
376 * @param queue
377 * The queue to run the test on
378 * @param testBuffer
379 * The data set to insert in the queue. Every producer will
380 * insert one entire set.
381 * @param poisonPill
382 * The "poison pill" to indicate the end. Simply make sure it is
383 * a element of type <T> that is not present in the 'testBuffer'.
384 * @param nbProducerThreads
385 * Number of producer threads. There should be at least 1.
386 * @param nbConsumerThreads
387 * Number of consumer threads. There should be at least 1.
388 * @param nbObserverThreads
389 * Number of observer threads. It should be >= 0.
390 * @return The consumed elements, as seen by each consumer thread.
391 */
392 private static <T> Iterable<Iterable<T>> runConcurrencyTest(final BufferedBlockingQueue<T> queue,
393 final List<T> testBuffer,
394 final @NonNull T poisonPill,
395 int nbProducerThreads,
396 int nbConsumerThreads,
397 int nbObserverThreads) {
398
399 final class ProducerThread implements Runnable {
9d979fda
MK
400 @Override
401 public void run() {
fadd7888
AM
402 for (int i = 0; i < testBuffer.size(); i++) {
403 T elem = testBuffer.get(i);
404 if (elem == null) {
405 // TODO replace with List<@NonNull T> once we can
406 throw new IllegalArgumentException();
407 }
408 queue.put(elem);
9d979fda 409 }
d6e2666b
AM
410 queue.put(poisonPill);
411 queue.flushInputBuffer();
9d979fda 412 }
fadd7888 413 }
9d979fda 414
fadd7888
AM
415 /**
416 * The consumer thread will return the elements it read via its Future.
417 *
418 * Note that if there are multiple consumers, there is no guarantee with
419 * regards the contents of an individual one.
420 */
421 final class ConsumerThread implements Callable<Iterable<T>> {
9d979fda 422 @Override
fadd7888
AM
423 public Iterable<T> call() {
424 List<T> results = new LinkedList<>();
425 T elem = queue.take();
426 while (!elem.equals(poisonPill)) {
427 results.add(elem);
428 elem = queue.take();
9d979fda 429 }
fadd7888 430 return results;
9d979fda 431 }
fadd7888 432 }
9d979fda 433
fadd7888 434 final class ObserverThread implements Runnable {
9d979fda
MK
435 @Override
436 public void run() {
fadd7888
AM
437 for (int i = 0; i < 5; i++) {
438 final Set<T> results = new HashSet<>();
439 for (T input : queue) {
440 /*
441 * Do something with the element so that this iteration
442 * does not get optimized out.
443 */
d6e2666b 444 results.add(input);
9d979fda
MK
445 }
446 }
9d979fda 447 }
fadd7888 448 }
9d979fda 449
fadd7888
AM
450 if (nbProducerThreads < 1 || nbConsumerThreads < 1 || nbObserverThreads < 0) {
451 throw new IllegalArgumentException();
452 }
9d979fda 453
fadd7888
AM
454 final ExecutorService pool = Executors.newFixedThreadPool(
455 nbProducerThreads + nbConsumerThreads + nbObserverThreads);
9d979fda 456
fadd7888
AM
457 /* Consumed elements, per consumer thread */
458 List<Future<Iterable<T>>> consumedElements = new LinkedList<>();
9d979fda 459
fadd7888
AM
460 for (int i = 0; i < nbProducerThreads; i++) {
461 pool.submit(new ProducerThread());
462 }
463 for (int i = 0; i < nbConsumerThreads; i++) {
464 consumedElements.add(pool.submit(new ConsumerThread()));
465 }
466 for (int i = 0; i < nbObserverThreads; i++) {
467 pool.submit(new ObserverThread());
468 }
469
470 List<Iterable<T>> results = new LinkedList<>();
471 try {
472 /* Convert the Future's to the actual return value */
473 for (Future<Iterable<T>> future : consumedElements) {
474 Iterable<T> threadResult = future.get();
475 results.add(threadResult);
476 }
477
478 pool.shutdown();
479 boolean success = pool.awaitTermination(2, TimeUnit.MINUTES);
480 if (!success) {
481 throw new InterruptedException();
482 }
483
484 } catch (ExecutionException | InterruptedException e) {
485 fail(e.getMessage());
486 }
487
488 return results;
489 }
47c79d9f 490
fadd7888 491}
This page took 0.052207 seconds and 5 git commands to generate.