Fix new null warnings
[deliverable/tracecompass.git] / common / org.eclipse.tracecompass.common.core.tests / src / org / eclipse / tracecompass / common / core / tests / collect / BufferedBlockingQueueTest.java
CommitLineData
9d979fda
MK
1/*******************************************************************************
2 * Copyright (c) 2015 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Matthew Khouzam - Initial API and implementation
11 *******************************************************************************/
12
13package org.eclipse.tracecompass.common.core.tests.collect;
14
9d979fda
MK
15import static org.junit.Assert.assertEquals;
16import static org.junit.Assert.assertFalse;
17import static org.junit.Assert.assertTrue;
fadd7888 18import static org.junit.Assert.fail;
9d979fda 19
47c79d9f
AM
20import java.util.Collection;
21import java.util.Deque;
d6e2666b 22import java.util.HashSet;
9d979fda 23import java.util.LinkedList;
fadd7888 24import java.util.List;
9d979fda 25import java.util.Random;
d6e2666b 26import java.util.Set;
9d979fda
MK
27import java.util.concurrent.Callable;
28import java.util.concurrent.ExecutionException;
29import java.util.concurrent.ExecutorService;
30import java.util.concurrent.Executors;
31import java.util.concurrent.Future;
32import java.util.concurrent.TimeUnit;
bbadfd0a 33import java.util.stream.Collectors;
9d979fda 34
fadd7888 35import org.eclipse.jdt.annotation.NonNull;
9d979fda
MK
36import org.eclipse.tracecompass.common.core.collect.BufferedBlockingQueue;
37import org.junit.Before;
38import org.junit.Rule;
39import org.junit.Test;
40import org.junit.rules.TestRule;
41import org.junit.rules.Timeout;
42
47c79d9f 43import com.google.common.collect.HashMultiset;
fadd7888 44import com.google.common.collect.Iterables;
47c79d9f 45import com.google.common.collect.Iterators;
fadd7888 46import com.google.common.primitives.Chars;
47c79d9f 47
9d979fda
MK
48/**
49 * Test suite for the {@link BufferedBlockingQueue}
50 */
51public class BufferedBlockingQueueTest {
52
53 /** Timeout the tests after 2 minutes */
54 @Rule
d291a715 55 public TestRule timeoutRule = new Timeout(2, TimeUnit.MINUTES);
9d979fda 56
47c79d9f
AM
57 private static final String testString = "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" +
58 "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" +
59 "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz";
60
9d979fda
MK
61 private BufferedBlockingQueue<Character> charQueue;
62
63 /**
64 * Test setup
65 */
66 @Before
67 public void init() {
68 charQueue = new BufferedBlockingQueue<>(15, 15);
69 }
70
71 /**
72 * Test inserting one element and removing it.
73 */
74 @Test
75 public void testSingleInsertion() {
76 Character element = 'x';
77 charQueue.put(element);
78 charQueue.flushInputBuffer();
79
80 Character out = charQueue.take();
81 assertEquals(element, out);
82 }
83
84 /**
85 * Test insertion of elements that fit into the input buffer.
86 */
87 @Test
88 public void testSimpleInsertion() {
89 String string = "Hello world!";
90 for (char elem : string.toCharArray()) {
91 charQueue.put(elem);
92 }
93 charQueue.flushInputBuffer();
94
95 StringBuilder sb = new StringBuilder();
96 while (!charQueue.isEmpty()) {
97 sb.append(charQueue.take());
98 }
99 assertEquals(string, sb.toString());
100 }
101
102 /**
103 * Test insertion of elements that will require more than one input buffer.
104 */
105 @Test
106 public void testLargeInsertion() {
107 String string = testString.substring(0, 222);
108 for (char elem : string.toCharArray()) {
109 charQueue.put(elem);
110 }
111 charQueue.flushInputBuffer();
112
113 StringBuilder sb = new StringBuilder();
114 while (!charQueue.isEmpty()) {
115 sb.append(charQueue.take());
116 }
117 assertEquals(string, sb.toString());
118 }
119
120 /**
121 * Test the state of the {@link BufferedBlockingQueue#isEmpty()} method at
122 * various moments.
123 */
124 @Test
125 public void testIsEmpty() {
126 BufferedBlockingQueue<String> stringQueue = new BufferedBlockingQueue<>(15, 15);
127 assertTrue(stringQueue.isEmpty());
128
129 stringQueue.put("Hello");
130 assertFalse(stringQueue.isEmpty());
131
132 stringQueue.flushInputBuffer();
133 assertFalse(stringQueue.isEmpty());
134
135 stringQueue.flushInputBuffer();
136 assertFalse(stringQueue.isEmpty());
137
138 stringQueue.flushInputBuffer();
139 stringQueue.take();
140 assertTrue(stringQueue.isEmpty());
141
142 stringQueue.flushInputBuffer();
143 assertTrue(stringQueue.isEmpty());
144 }
145
146 /**
147 * Write random data in and read it, several times.
148 */
149 @Test
150 public void testOddInsertions() {
151 BufferedBlockingQueue<Object> objectQueue = new BufferedBlockingQueue<>(15, 15);
152 LinkedList<Object> expectedValues = new LinkedList<>();
153 Random rnd = new Random();
154 rnd.setSeed(123);
155
156 for (int i = 0; i < 10; i++) {
157 /*
158 * The queue's total size is 225 (15x15). We must make sure to not
159 * fill it up here!
160 */
161 for (int j = 0; j < 50; j++) {
0e4f957e
AM
162 Integer testInt = rnd.nextInt();
163 Long testLong = rnd.nextLong();
164 Double testDouble = rnd.nextDouble();
165 Double testGaussian = rnd.nextGaussian();
9d979fda
MK
166
167 expectedValues.add(testInt);
168 expectedValues.add(testLong);
169 expectedValues.add(testDouble);
170 expectedValues.add(testGaussian);
171 objectQueue.put(testInt);
172 objectQueue.put(testLong);
173 objectQueue.put(testDouble);
174 objectQueue.put(testGaussian);
175 }
176 objectQueue.flushInputBuffer();
177
178 while (!expectedValues.isEmpty()) {
179 Object expected = expectedValues.removeFirst();
180 Object actual = objectQueue.take();
181 assertEquals(expected, actual);
182 }
183 }
184 }
185
186 /**
187 * Read with a producer and a consumer
188 *
189 * @throws InterruptedException
190 * The test was interrupted
191 */
192 @Test
193 public void testMultiThread() throws InterruptedException {
194 /* A character not found in the test string */
195 final Character lastElement = '%';
196
197 Thread producer = new Thread() {
198 @Override
199 public void run() {
200 for (char c : testString.toCharArray()) {
201 charQueue.put(c);
202 }
203 charQueue.put(lastElement);
204 charQueue.flushInputBuffer();
205 }
206 };
207 producer.start();
208
209 Thread consumer = new Thread() {
210 @Override
211 public void run() {
212 Character s = charQueue.take();
213 while (!s.equals(lastElement)) {
214 s = charQueue.take();
215 }
216 }
217 };
218 consumer.start();
219
220 consumer.join();
221 producer.join();
222 }
223
49698f83
BH
224 /**
225 * Read with a producer and a consumer using
226 * {@link BufferedBlockingQueue#blockingPeek()}.
227 *
228 * @throws InterruptedException
229 * The test was interrupted
230 */
231 @Test
232 public void testBlockingPeek() throws InterruptedException {
233 /* A character not found in the test string */
234 final Character lastElement = '%';
235
236 final StringBuilder sb = new StringBuilder();
237
238 Thread consumer = new Thread() {
239 @Override
240 public void run() {
241 boolean isFinished = false;
242 while (!isFinished) {
243 // Read last element without removing it
244 Character s = charQueue.blockingPeek();
245 isFinished = s.equals(lastElement);
246 if (!isFinished) {
247 sb.append(s);
248 }
249 // Remove element
250 charQueue.take();
251 }
252 }
253 };
254 consumer.start();
255
256 Thread producer = new Thread() {
257 @Override
258 public void run() {
259 for (char c : testString.toCharArray()) {
260 charQueue.put(c);
261 }
262 charQueue.put(lastElement);
263 charQueue.flushInputBuffer();
264 }
265 };
266 producer.start();
267
268 producer.join();
269 consumer.join();
270
271 assertEquals(testString, sb.toString());
272 }
273
47c79d9f
AM
274 /**
275 * Test the contents returned by {@link BufferedBlockingQueue#iterator()}.
276 *
277 * The test is sequential, because the iterator has no guarantee wrt to its
278 * contents when run concurrently.
279 */
280 @Test
281 public void testIteratorContents() {
282 Deque<Character> expected = new LinkedList<>();
283
284 /* Iterator should be empty initially */
285 assertFalse(charQueue.iterator().hasNext());
286
287 /* Insert the first 50 elements */
288 for (int i = 0; i < 50; i++) {
289 char c = testString.charAt(i);
290 charQueue.put(c);
291 expected.addFirst(c);
292 }
293 LinkedList<Character> actual = new LinkedList<>();
294 Iterators.addAll(actual, charQueue.iterator());
295 assertSameElements(expected, actual);
296
297 /*
298 * Insert more elements, flush the input buffer (should not affect the
299 * iteration).
300 */
301 for (int i = 50; i < 60; i++) {
302 char c = testString.charAt(i);
303 charQueue.put(c);
304 charQueue.flushInputBuffer();
305 expected.addFirst(c);
306 }
307 actual = new LinkedList<>();
308 Iterators.addAll(actual, charQueue.iterator());
309 assertSameElements(expected, actual);
310
311 /* Consume the 30 last elements from the queue */
312 for (int i = 0; i < 30; i++) {
313 charQueue.take();
314 expected.removeLast();
315 }
316 actual = new LinkedList<>();
317 Iterators.addAll(actual, charQueue.iterator());
318 assertSameElements(expected, actual);
319
320 /* Now empty the queue */
321 while (!charQueue.isEmpty()) {
322 charQueue.take();
323 expected.removeLast();
324 }
325 assertFalse(charQueue.iterator().hasNext());
326 }
327
328 /**
329 * Utility method to verify that two collections contain the exact same
330 * elements, not necessarily in the same iteration order.
331 *
332 * {@link Collection#equals} requires the iteration order to be the same,
333 * which we do not want here.
334 *
335 * Using a {@link Set} or {@link Collection#containsAll} is not sufficient
336 * either, because those will throw away duplicate elements.
337 */
338 private static <T> void assertSameElements(Collection<T> c1, Collection<T> c2) {
339 assertEquals(HashMultiset.create(c1), HashMultiset.create(c2));
340 }
341
9d979fda 342 /**
d6e2666b
AM
343 * Test iterating on the queue while a producer and a consumer threads are
344 * using it. The iteration should not affect the elements taken by the
345 * consumer.
9d979fda
MK
346 */
347 @Test
fadd7888 348 public void testConcurrentIteration() {
5b3fe39a 349 final BufferedBlockingQueue<@NonNull String> queue = new BufferedBlockingQueue<>(15, 15);
fadd7888 350 final String poisonPill = "That's all folks!";
9d979fda 351
fadd7888
AM
352 /*
353 * Convert the test's testBuffer into an array of String, one for each
bbadfd0a 354 * character.
fadd7888 355 */
bbadfd0a
AM
356 List<String> strings = Chars.asList(testString.toCharArray()).stream()
357 .map(Object::toString)
358 .collect(Collectors.toList());
9d979fda 359
fadd7888
AM
360 Iterable<Iterable<String>> results =
361 runConcurrencyTest(queue, strings, poisonPill, 1, 1, 1);
362
363 assertEquals(strings, Iterables.getOnlyElement(results));
364 }
9d979fda 365
fadd7888
AM
366 /**
367 * Run a concurrency test on a {@link BufferedBlockingQueue}, with the
368 * specified number of producer, consumer and observer/iterator threads.
369 *
370 * The returned value represents the elements consumed by each consumer
371 * thread. Thus, if there is one consumer, the top-level {@link Iterable}
372 * will be of size 1, and the inner one should contain all the elements that
373 * were inserted.
374 *
375 * @param queue
376 * The queue to run the test on
377 * @param testBuffer
378 * The data set to insert in the queue. Every producer will
379 * insert one entire set.
380 * @param poisonPill
381 * The "poison pill" to indicate the end. Simply make sure it is
382 * a element of type <T> that is not present in the 'testBuffer'.
383 * @param nbProducerThreads
384 * Number of producer threads. There should be at least 1.
385 * @param nbConsumerThreads
386 * Number of consumer threads. There should be at least 1.
387 * @param nbObserverThreads
388 * Number of observer threads. It should be >= 0.
389 * @return The consumed elements, as seen by each consumer thread.
390 */
5b3fe39a 391 private static <T> Iterable<Iterable<T>> runConcurrencyTest(final BufferedBlockingQueue<@NonNull T> queue,
fadd7888
AM
392 final List<T> testBuffer,
393 final @NonNull T poisonPill,
394 int nbProducerThreads,
395 int nbConsumerThreads,
396 int nbObserverThreads) {
397
398 final class ProducerThread implements Runnable {
9d979fda
MK
399 @Override
400 public void run() {
fadd7888
AM
401 for (int i = 0; i < testBuffer.size(); i++) {
402 T elem = testBuffer.get(i);
403 if (elem == null) {
404 // TODO replace with List<@NonNull T> once we can
405 throw new IllegalArgumentException();
406 }
407 queue.put(elem);
9d979fda 408 }
d6e2666b
AM
409 queue.put(poisonPill);
410 queue.flushInputBuffer();
9d979fda 411 }
fadd7888 412 }
9d979fda 413
fadd7888
AM
414 /**
415 * The consumer thread will return the elements it read via its Future.
416 *
417 * Note that if there are multiple consumers, there is no guarantee with
418 * regards the contents of an individual one.
419 */
420 final class ConsumerThread implements Callable<Iterable<T>> {
9d979fda 421 @Override
fadd7888
AM
422 public Iterable<T> call() {
423 List<T> results = new LinkedList<>();
424 T elem = queue.take();
425 while (!elem.equals(poisonPill)) {
426 results.add(elem);
427 elem = queue.take();
9d979fda 428 }
fadd7888 429 return results;
9d979fda 430 }
fadd7888 431 }
9d979fda 432
fadd7888 433 final class ObserverThread implements Runnable {
9d979fda
MK
434 @Override
435 public void run() {
fadd7888
AM
436 for (int i = 0; i < 5; i++) {
437 final Set<T> results = new HashSet<>();
438 for (T input : queue) {
439 /*
440 * Do something with the element so that this iteration
441 * does not get optimized out.
442 */
d6e2666b 443 results.add(input);
9d979fda
MK
444 }
445 }
9d979fda 446 }
fadd7888 447 }
9d979fda 448
fadd7888
AM
449 if (nbProducerThreads < 1 || nbConsumerThreads < 1 || nbObserverThreads < 0) {
450 throw new IllegalArgumentException();
451 }
9d979fda 452
fadd7888
AM
453 final ExecutorService pool = Executors.newFixedThreadPool(
454 nbProducerThreads + nbConsumerThreads + nbObserverThreads);
9d979fda 455
fadd7888
AM
456 /* Consumed elements, per consumer thread */
457 List<Future<Iterable<T>>> consumedElements = new LinkedList<>();
9d979fda 458
fadd7888
AM
459 for (int i = 0; i < nbProducerThreads; i++) {
460 pool.submit(new ProducerThread());
461 }
462 for (int i = 0; i < nbConsumerThreads; i++) {
463 consumedElements.add(pool.submit(new ConsumerThread()));
464 }
465 for (int i = 0; i < nbObserverThreads; i++) {
466 pool.submit(new ObserverThread());
467 }
468
469 List<Iterable<T>> results = new LinkedList<>();
470 try {
471 /* Convert the Future's to the actual return value */
472 for (Future<Iterable<T>> future : consumedElements) {
473 Iterable<T> threadResult = future.get();
474 results.add(threadResult);
475 }
476
477 pool.shutdown();
478 boolean success = pool.awaitTermination(2, TimeUnit.MINUTES);
479 if (!success) {
480 throw new InterruptedException();
481 }
482
483 } catch (ExecutionException | InterruptedException e) {
484 fail(e.getMessage());
485 }
486
487 return results;
488 }
47c79d9f 489
fadd7888 490}
This page took 0.070011 seconds and 5 git commands to generate.