common.core: make BufferedBlockingQueue validate inputs
[deliverable/tracecompass.git] / common / org.eclipse.tracecompass.common.core.tests / src / org / eclipse / tracecompass / common / core / tests / collect / BufferedBlockingQueueTest.java
CommitLineData
9d979fda
MK
1/*******************************************************************************
2 * Copyright (c) 2015 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Matthew Khouzam - Initial API and implementation
11 *******************************************************************************/
12
13package org.eclipse.tracecompass.common.core.tests.collect;
14
9d979fda
MK
15import static org.junit.Assert.assertEquals;
16import static org.junit.Assert.assertFalse;
17import static org.junit.Assert.assertTrue;
289a287e 18import static org.junit.Assert.assertNotNull;
fadd7888 19import static org.junit.Assert.fail;
9d979fda 20
47c79d9f
AM
21import java.util.Collection;
22import java.util.Deque;
d6e2666b 23import java.util.HashSet;
9d979fda 24import java.util.LinkedList;
fadd7888 25import java.util.List;
9d979fda 26import java.util.Random;
d6e2666b 27import java.util.Set;
9d979fda
MK
28import java.util.concurrent.Callable;
29import java.util.concurrent.ExecutionException;
30import java.util.concurrent.ExecutorService;
31import java.util.concurrent.Executors;
32import java.util.concurrent.Future;
33import java.util.concurrent.TimeUnit;
bbadfd0a 34import java.util.stream.Collectors;
9d979fda 35
fadd7888 36import org.eclipse.jdt.annotation.NonNull;
9d979fda
MK
37import org.eclipse.tracecompass.common.core.collect.BufferedBlockingQueue;
38import org.junit.Before;
39import org.junit.Rule;
40import org.junit.Test;
41import org.junit.rules.TestRule;
42import org.junit.rules.Timeout;
43
47c79d9f 44import com.google.common.collect.HashMultiset;
fadd7888 45import com.google.common.collect.Iterables;
47c79d9f 46import com.google.common.collect.Iterators;
fadd7888 47import com.google.common.primitives.Chars;
47c79d9f 48
9d979fda
MK
49/**
50 * Test suite for the {@link BufferedBlockingQueue}
51 */
52public class BufferedBlockingQueueTest {
53
54 /** Timeout the tests after 2 minutes */
55 @Rule
d291a715 56 public TestRule timeoutRule = new Timeout(2, TimeUnit.MINUTES);
9d979fda 57
47c79d9f
AM
58 private static final String testString = "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" +
59 "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" +
60 "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz";
61
9d979fda
MK
62 private BufferedBlockingQueue<Character> charQueue;
63
64 /**
65 * Test setup
66 */
67 @Before
68 public void init() {
69 charQueue = new BufferedBlockingQueue<>(15, 15);
70 }
71
289a287e
MK
72 /**
73 * Test with chunkSize = 1 and buffer queueSize = 1
74 */
75 public void testValidConstructor1() {
76 assertNotNull(new BufferedBlockingQueue<>(1, 1));
77 }
78
79 /**
80 * Test with chunkSize = 0 and buffer queueSize = 0
81 * <p>
82 * Should fail with an {@link IllegalArgumentException}
83 */
84 @Test(expected = IllegalArgumentException.class)
85 public void testInvalidConstructor1() {
86 assertNotNull(new BufferedBlockingQueue<>(0, 0));
87 }
88
89 /**
90 * Test with chunkSize = 0 and buffer queueSize = 1
91 * <p>
92 * Should fail with an {@link IllegalArgumentException}
93 */
94 @Test(expected = IllegalArgumentException.class)
95 public void testInvalidConstructor2() {
96 assertNotNull(new BufferedBlockingQueue<>(1, 0));
97 }
98
99 /**
100 * Test with chunkSize = 1 and buffer queueSize = 0
101 */
102 public void testInvalidConstructor3() {
103 assertNotNull(new BufferedBlockingQueue<>(0, 1));
104 }
105
106 /**
107 * Test with chunkSize = 1 and buffer queueSize =-1
108 * <p>
109 * Should fail with an {@link IllegalArgumentException}
110 */
111 @Test(expected = IllegalArgumentException.class)
112 public void testInvalidConstructor4() {
113 assertNotNull(new BufferedBlockingQueue<>(-1, 1));
114 }
115
116 /**
117 * Test with chunkSize = -1 and buffer queueSize = 1
118 * <p>
119 * Should fail with an {@link IllegalArgumentException}
120 */
121 @Test(expected = IllegalArgumentException.class)
122 public void testInvalidConstructor5() {
123 assertNotNull(new BufferedBlockingQueue<>(1, -1));
124 }
125
9d979fda
MK
126 /**
127 * Test inserting one element and removing it.
128 */
129 @Test
130 public void testSingleInsertion() {
131 Character element = 'x';
132 charQueue.put(element);
133 charQueue.flushInputBuffer();
134
135 Character out = charQueue.take();
136 assertEquals(element, out);
137 }
138
139 /**
140 * Test insertion of elements that fit into the input buffer.
141 */
142 @Test
143 public void testSimpleInsertion() {
144 String string = "Hello world!";
145 for (char elem : string.toCharArray()) {
146 charQueue.put(elem);
147 }
148 charQueue.flushInputBuffer();
149
150 StringBuilder sb = new StringBuilder();
151 while (!charQueue.isEmpty()) {
152 sb.append(charQueue.take());
153 }
154 assertEquals(string, sb.toString());
155 }
156
157 /**
158 * Test insertion of elements that will require more than one input buffer.
159 */
160 @Test
161 public void testLargeInsertion() {
162 String string = testString.substring(0, 222);
163 for (char elem : string.toCharArray()) {
164 charQueue.put(elem);
165 }
166 charQueue.flushInputBuffer();
167
168 StringBuilder sb = new StringBuilder();
169 while (!charQueue.isEmpty()) {
170 sb.append(charQueue.take());
171 }
172 assertEquals(string, sb.toString());
173 }
174
175 /**
176 * Test the state of the {@link BufferedBlockingQueue#isEmpty()} method at
177 * various moments.
178 */
179 @Test
180 public void testIsEmpty() {
181 BufferedBlockingQueue<String> stringQueue = new BufferedBlockingQueue<>(15, 15);
182 assertTrue(stringQueue.isEmpty());
183
184 stringQueue.put("Hello");
185 assertFalse(stringQueue.isEmpty());
186
187 stringQueue.flushInputBuffer();
188 assertFalse(stringQueue.isEmpty());
189
190 stringQueue.flushInputBuffer();
191 assertFalse(stringQueue.isEmpty());
192
193 stringQueue.flushInputBuffer();
194 stringQueue.take();
195 assertTrue(stringQueue.isEmpty());
196
197 stringQueue.flushInputBuffer();
198 assertTrue(stringQueue.isEmpty());
199 }
200
201 /**
202 * Write random data in and read it, several times.
203 */
204 @Test
205 public void testOddInsertions() {
206 BufferedBlockingQueue<Object> objectQueue = new BufferedBlockingQueue<>(15, 15);
207 LinkedList<Object> expectedValues = new LinkedList<>();
208 Random rnd = new Random();
209 rnd.setSeed(123);
210
211 for (int i = 0; i < 10; i++) {
212 /*
213 * The queue's total size is 225 (15x15). We must make sure to not
214 * fill it up here!
215 */
216 for (int j = 0; j < 50; j++) {
0e4f957e
AM
217 Integer testInt = rnd.nextInt();
218 Long testLong = rnd.nextLong();
219 Double testDouble = rnd.nextDouble();
220 Double testGaussian = rnd.nextGaussian();
9d979fda
MK
221
222 expectedValues.add(testInt);
223 expectedValues.add(testLong);
224 expectedValues.add(testDouble);
225 expectedValues.add(testGaussian);
226 objectQueue.put(testInt);
227 objectQueue.put(testLong);
228 objectQueue.put(testDouble);
229 objectQueue.put(testGaussian);
230 }
231 objectQueue.flushInputBuffer();
232
233 while (!expectedValues.isEmpty()) {
234 Object expected = expectedValues.removeFirst();
235 Object actual = objectQueue.take();
236 assertEquals(expected, actual);
237 }
238 }
239 }
240
241 /**
242 * Read with a producer and a consumer
243 *
244 * @throws InterruptedException
245 * The test was interrupted
246 */
247 @Test
248 public void testMultiThread() throws InterruptedException {
249 /* A character not found in the test string */
250 final Character lastElement = '%';
251
252 Thread producer = new Thread() {
253 @Override
254 public void run() {
255 for (char c : testString.toCharArray()) {
256 charQueue.put(c);
257 }
258 charQueue.put(lastElement);
259 charQueue.flushInputBuffer();
260 }
261 };
262 producer.start();
263
264 Thread consumer = new Thread() {
265 @Override
266 public void run() {
267 Character s = charQueue.take();
268 while (!s.equals(lastElement)) {
269 s = charQueue.take();
270 }
271 }
272 };
273 consumer.start();
274
275 consumer.join();
276 producer.join();
277 }
278
49698f83
BH
279 /**
280 * Read with a producer and a consumer using
281 * {@link BufferedBlockingQueue#blockingPeek()}.
282 *
283 * @throws InterruptedException
284 * The test was interrupted
285 */
286 @Test
287 public void testBlockingPeek() throws InterruptedException {
288 /* A character not found in the test string */
289 final Character lastElement = '%';
290
291 final StringBuilder sb = new StringBuilder();
292
293 Thread consumer = new Thread() {
294 @Override
295 public void run() {
296 boolean isFinished = false;
297 while (!isFinished) {
298 // Read last element without removing it
299 Character s = charQueue.blockingPeek();
300 isFinished = s.equals(lastElement);
301 if (!isFinished) {
302 sb.append(s);
303 }
304 // Remove element
305 charQueue.take();
306 }
307 }
308 };
309 consumer.start();
310
311 Thread producer = new Thread() {
312 @Override
313 public void run() {
314 for (char c : testString.toCharArray()) {
315 charQueue.put(c);
316 }
317 charQueue.put(lastElement);
318 charQueue.flushInputBuffer();
319 }
320 };
321 producer.start();
322
323 producer.join();
324 consumer.join();
325
326 assertEquals(testString, sb.toString());
327 }
328
47c79d9f
AM
329 /**
330 * Test the contents returned by {@link BufferedBlockingQueue#iterator()}.
331 *
332 * The test is sequential, because the iterator has no guarantee wrt to its
333 * contents when run concurrently.
334 */
335 @Test
336 public void testIteratorContents() {
337 Deque<Character> expected = new LinkedList<>();
338
339 /* Iterator should be empty initially */
340 assertFalse(charQueue.iterator().hasNext());
341
342 /* Insert the first 50 elements */
343 for (int i = 0; i < 50; i++) {
344 char c = testString.charAt(i);
345 charQueue.put(c);
346 expected.addFirst(c);
347 }
348 LinkedList<Character> actual = new LinkedList<>();
349 Iterators.addAll(actual, charQueue.iterator());
350 assertSameElements(expected, actual);
351
352 /*
353 * Insert more elements, flush the input buffer (should not affect the
354 * iteration).
355 */
356 for (int i = 50; i < 60; i++) {
357 char c = testString.charAt(i);
358 charQueue.put(c);
359 charQueue.flushInputBuffer();
360 expected.addFirst(c);
361 }
362 actual = new LinkedList<>();
363 Iterators.addAll(actual, charQueue.iterator());
364 assertSameElements(expected, actual);
365
366 /* Consume the 30 last elements from the queue */
367 for (int i = 0; i < 30; i++) {
368 charQueue.take();
369 expected.removeLast();
370 }
371 actual = new LinkedList<>();
372 Iterators.addAll(actual, charQueue.iterator());
373 assertSameElements(expected, actual);
374
375 /* Now empty the queue */
376 while (!charQueue.isEmpty()) {
377 charQueue.take();
378 expected.removeLast();
379 }
380 assertFalse(charQueue.iterator().hasNext());
381 }
382
383 /**
384 * Utility method to verify that two collections contain the exact same
385 * elements, not necessarily in the same iteration order.
386 *
387 * {@link Collection#equals} requires the iteration order to be the same,
388 * which we do not want here.
389 *
390 * Using a {@link Set} or {@link Collection#containsAll} is not sufficient
391 * either, because those will throw away duplicate elements.
392 */
393 private static <T> void assertSameElements(Collection<T> c1, Collection<T> c2) {
394 assertEquals(HashMultiset.create(c1), HashMultiset.create(c2));
395 }
396
9d979fda 397 /**
d6e2666b
AM
398 * Test iterating on the queue while a producer and a consumer threads are
399 * using it. The iteration should not affect the elements taken by the
400 * consumer.
9d979fda
MK
401 */
402 @Test
fadd7888 403 public void testConcurrentIteration() {
5b3fe39a 404 final BufferedBlockingQueue<@NonNull String> queue = new BufferedBlockingQueue<>(15, 15);
fadd7888 405 final String poisonPill = "That's all folks!";
9d979fda 406
fadd7888
AM
407 /*
408 * Convert the test's testBuffer into an array of String, one for each
bbadfd0a 409 * character.
fadd7888 410 */
bbadfd0a
AM
411 List<String> strings = Chars.asList(testString.toCharArray()).stream()
412 .map(Object::toString)
413 .collect(Collectors.toList());
9d979fda 414
fadd7888
AM
415 Iterable<Iterable<String>> results =
416 runConcurrencyTest(queue, strings, poisonPill, 1, 1, 1);
417
418 assertEquals(strings, Iterables.getOnlyElement(results));
419 }
9d979fda 420
fadd7888
AM
421 /**
422 * Run a concurrency test on a {@link BufferedBlockingQueue}, with the
423 * specified number of producer, consumer and observer/iterator threads.
424 *
425 * The returned value represents the elements consumed by each consumer
426 * thread. Thus, if there is one consumer, the top-level {@link Iterable}
427 * will be of size 1, and the inner one should contain all the elements that
428 * were inserted.
429 *
430 * @param queue
431 * The queue to run the test on
432 * @param testBuffer
433 * The data set to insert in the queue. Every producer will
434 * insert one entire set.
435 * @param poisonPill
436 * The "poison pill" to indicate the end. Simply make sure it is
437 * a element of type <T> that is not present in the 'testBuffer'.
438 * @param nbProducerThreads
439 * Number of producer threads. There should be at least 1.
440 * @param nbConsumerThreads
441 * Number of consumer threads. There should be at least 1.
442 * @param nbObserverThreads
443 * Number of observer threads. It should be >= 0.
444 * @return The consumed elements, as seen by each consumer thread.
445 */
5b3fe39a 446 private static <T> Iterable<Iterable<T>> runConcurrencyTest(final BufferedBlockingQueue<@NonNull T> queue,
fadd7888
AM
447 final List<T> testBuffer,
448 final @NonNull T poisonPill,
449 int nbProducerThreads,
450 int nbConsumerThreads,
451 int nbObserverThreads) {
452
453 final class ProducerThread implements Runnable {
9d979fda
MK
454 @Override
455 public void run() {
fadd7888
AM
456 for (int i = 0; i < testBuffer.size(); i++) {
457 T elem = testBuffer.get(i);
458 if (elem == null) {
459 // TODO replace with List<@NonNull T> once we can
460 throw new IllegalArgumentException();
461 }
462 queue.put(elem);
9d979fda 463 }
d6e2666b
AM
464 queue.put(poisonPill);
465 queue.flushInputBuffer();
9d979fda 466 }
fadd7888 467 }
9d979fda 468
fadd7888
AM
469 /**
470 * The consumer thread will return the elements it read via its Future.
471 *
472 * Note that if there are multiple consumers, there is no guarantee with
473 * regards the contents of an individual one.
474 */
475 final class ConsumerThread implements Callable<Iterable<T>> {
9d979fda 476 @Override
fadd7888
AM
477 public Iterable<T> call() {
478 List<T> results = new LinkedList<>();
479 T elem = queue.take();
480 while (!elem.equals(poisonPill)) {
481 results.add(elem);
482 elem = queue.take();
9d979fda 483 }
fadd7888 484 return results;
9d979fda 485 }
fadd7888 486 }
9d979fda 487
fadd7888 488 final class ObserverThread implements Runnable {
9d979fda
MK
489 @Override
490 public void run() {
fadd7888
AM
491 for (int i = 0; i < 5; i++) {
492 final Set<T> results = new HashSet<>();
493 for (T input : queue) {
494 /*
495 * Do something with the element so that this iteration
496 * does not get optimized out.
497 */
d6e2666b 498 results.add(input);
9d979fda
MK
499 }
500 }
9d979fda 501 }
fadd7888 502 }
9d979fda 503
fadd7888
AM
504 if (nbProducerThreads < 1 || nbConsumerThreads < 1 || nbObserverThreads < 0) {
505 throw new IllegalArgumentException();
506 }
9d979fda 507
fadd7888
AM
508 final ExecutorService pool = Executors.newFixedThreadPool(
509 nbProducerThreads + nbConsumerThreads + nbObserverThreads);
9d979fda 510
fadd7888
AM
511 /* Consumed elements, per consumer thread */
512 List<Future<Iterable<T>>> consumedElements = new LinkedList<>();
9d979fda 513
fadd7888
AM
514 for (int i = 0; i < nbProducerThreads; i++) {
515 pool.submit(new ProducerThread());
516 }
517 for (int i = 0; i < nbConsumerThreads; i++) {
518 consumedElements.add(pool.submit(new ConsumerThread()));
519 }
520 for (int i = 0; i < nbObserverThreads; i++) {
521 pool.submit(new ObserverThread());
522 }
523
524 List<Iterable<T>> results = new LinkedList<>();
525 try {
526 /* Convert the Future's to the actual return value */
527 for (Future<Iterable<T>> future : consumedElements) {
528 Iterable<T> threadResult = future.get();
529 results.add(threadResult);
530 }
531
532 pool.shutdown();
533 boolean success = pool.awaitTermination(2, TimeUnit.MINUTES);
534 if (!success) {
535 throw new InterruptedException();
536 }
537
538 } catch (ExecutionException | InterruptedException e) {
539 fail(e.getMessage());
540 }
541
542 return results;
543 }
47c79d9f 544
fadd7888 545}
This page took 0.063264 seconds and 5 git commands to generate.