releng: Transition to jdt.annotation 2.0
[deliverable/tracecompass.git] / common / org.eclipse.tracecompass.common.core.tests / src / org / eclipse / tracecompass / common / core / tests / collect / BufferedBlockingQueueTest.java
CommitLineData
9d979fda
MK
1/*******************************************************************************
2 * Copyright (c) 2015 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Matthew Khouzam - Initial API and implementation
11 *******************************************************************************/
12
13package org.eclipse.tracecompass.common.core.tests.collect;
14
9d979fda
MK
15import static org.junit.Assert.assertEquals;
16import static org.junit.Assert.assertFalse;
17import static org.junit.Assert.assertTrue;
fadd7888 18import static org.junit.Assert.fail;
9d979fda 19
47c79d9f
AM
20import java.util.Collection;
21import java.util.Deque;
d6e2666b 22import java.util.HashSet;
9d979fda 23import java.util.LinkedList;
fadd7888 24import java.util.List;
9d979fda 25import java.util.Random;
d6e2666b 26import java.util.Set;
9d979fda
MK
27import java.util.concurrent.Callable;
28import java.util.concurrent.ExecutionException;
29import java.util.concurrent.ExecutorService;
30import java.util.concurrent.Executors;
31import java.util.concurrent.Future;
32import java.util.concurrent.TimeUnit;
33
fadd7888 34import org.eclipse.jdt.annotation.NonNull;
9d979fda
MK
35import org.eclipse.tracecompass.common.core.NonNullUtils;
36import org.eclipse.tracecompass.common.core.collect.BufferedBlockingQueue;
37import org.junit.Before;
38import org.junit.Rule;
39import org.junit.Test;
40import org.junit.rules.TestRule;
41import org.junit.rules.Timeout;
42
fadd7888
AM
43import com.google.common.base.Functions;
44import com.google.common.collect.FluentIterable;
47c79d9f 45import com.google.common.collect.HashMultiset;
fadd7888 46import com.google.common.collect.Iterables;
47c79d9f 47import com.google.common.collect.Iterators;
fadd7888 48import com.google.common.primitives.Chars;
47c79d9f 49
9d979fda
MK
50/**
51 * Test suite for the {@link BufferedBlockingQueue}
52 */
53public class BufferedBlockingQueueTest {
54
55 /** Timeout the tests after 2 minutes */
56 @Rule
d291a715 57 public TestRule timeoutRule = new Timeout(2, TimeUnit.MINUTES);
9d979fda 58
47c79d9f
AM
59 private static final String testString = "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" +
60 "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" +
61 "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz";
62
9d979fda
MK
63 private BufferedBlockingQueue<Character> charQueue;
64
65 /**
66 * Test setup
67 */
68 @Before
69 public void init() {
70 charQueue = new BufferedBlockingQueue<>(15, 15);
71 }
72
73 /**
74 * Test inserting one element and removing it.
75 */
76 @Test
77 public void testSingleInsertion() {
78 Character element = 'x';
79 charQueue.put(element);
80 charQueue.flushInputBuffer();
81
82 Character out = charQueue.take();
83 assertEquals(element, out);
84 }
85
86 /**
87 * Test insertion of elements that fit into the input buffer.
88 */
89 @Test
90 public void testSimpleInsertion() {
91 String string = "Hello world!";
92 for (char elem : string.toCharArray()) {
93 charQueue.put(elem);
94 }
95 charQueue.flushInputBuffer();
96
97 StringBuilder sb = new StringBuilder();
98 while (!charQueue.isEmpty()) {
99 sb.append(charQueue.take());
100 }
101 assertEquals(string, sb.toString());
102 }
103
104 /**
105 * Test insertion of elements that will require more than one input buffer.
106 */
107 @Test
108 public void testLargeInsertion() {
109 String string = testString.substring(0, 222);
110 for (char elem : string.toCharArray()) {
111 charQueue.put(elem);
112 }
113 charQueue.flushInputBuffer();
114
115 StringBuilder sb = new StringBuilder();
116 while (!charQueue.isEmpty()) {
117 sb.append(charQueue.take());
118 }
119 assertEquals(string, sb.toString());
120 }
121
122 /**
123 * Test the state of the {@link BufferedBlockingQueue#isEmpty()} method at
124 * various moments.
125 */
126 @Test
127 public void testIsEmpty() {
128 BufferedBlockingQueue<String> stringQueue = new BufferedBlockingQueue<>(15, 15);
129 assertTrue(stringQueue.isEmpty());
130
131 stringQueue.put("Hello");
132 assertFalse(stringQueue.isEmpty());
133
134 stringQueue.flushInputBuffer();
135 assertFalse(stringQueue.isEmpty());
136
137 stringQueue.flushInputBuffer();
138 assertFalse(stringQueue.isEmpty());
139
140 stringQueue.flushInputBuffer();
141 stringQueue.take();
142 assertTrue(stringQueue.isEmpty());
143
144 stringQueue.flushInputBuffer();
145 assertTrue(stringQueue.isEmpty());
146 }
147
148 /**
149 * Write random data in and read it, several times.
150 */
151 @Test
152 public void testOddInsertions() {
153 BufferedBlockingQueue<Object> objectQueue = new BufferedBlockingQueue<>(15, 15);
154 LinkedList<Object> expectedValues = new LinkedList<>();
155 Random rnd = new Random();
156 rnd.setSeed(123);
157
158 for (int i = 0; i < 10; i++) {
159 /*
160 * The queue's total size is 225 (15x15). We must make sure to not
161 * fill it up here!
162 */
163 for (int j = 0; j < 50; j++) {
164 Integer testInt = NonNullUtils.checkNotNull(rnd.nextInt());
165 Long testLong = NonNullUtils.checkNotNull(rnd.nextLong());
166 Double testDouble = NonNullUtils.checkNotNull(rnd.nextDouble());
167 Double testGaussian = NonNullUtils.checkNotNull(rnd.nextGaussian());
168
169 expectedValues.add(testInt);
170 expectedValues.add(testLong);
171 expectedValues.add(testDouble);
172 expectedValues.add(testGaussian);
173 objectQueue.put(testInt);
174 objectQueue.put(testLong);
175 objectQueue.put(testDouble);
176 objectQueue.put(testGaussian);
177 }
178 objectQueue.flushInputBuffer();
179
180 while (!expectedValues.isEmpty()) {
181 Object expected = expectedValues.removeFirst();
182 Object actual = objectQueue.take();
183 assertEquals(expected, actual);
184 }
185 }
186 }
187
188 /**
189 * Read with a producer and a consumer
190 *
191 * @throws InterruptedException
192 * The test was interrupted
193 */
194 @Test
195 public void testMultiThread() throws InterruptedException {
196 /* A character not found in the test string */
197 final Character lastElement = '%';
198
199 Thread producer = new Thread() {
200 @Override
201 public void run() {
202 for (char c : testString.toCharArray()) {
203 charQueue.put(c);
204 }
205 charQueue.put(lastElement);
206 charQueue.flushInputBuffer();
207 }
208 };
209 producer.start();
210
211 Thread consumer = new Thread() {
212 @Override
213 public void run() {
214 Character s = charQueue.take();
215 while (!s.equals(lastElement)) {
216 s = charQueue.take();
217 }
218 }
219 };
220 consumer.start();
221
222 consumer.join();
223 producer.join();
224 }
225
49698f83
BH
226 /**
227 * Read with a producer and a consumer using
228 * {@link BufferedBlockingQueue#blockingPeek()}.
229 *
230 * @throws InterruptedException
231 * The test was interrupted
232 */
233 @Test
234 public void testBlockingPeek() throws InterruptedException {
235 /* A character not found in the test string */
236 final Character lastElement = '%';
237
238 final StringBuilder sb = new StringBuilder();
239
240 Thread consumer = new Thread() {
241 @Override
242 public void run() {
243 boolean isFinished = false;
244 while (!isFinished) {
245 // Read last element without removing it
246 Character s = charQueue.blockingPeek();
247 isFinished = s.equals(lastElement);
248 if (!isFinished) {
249 sb.append(s);
250 }
251 // Remove element
252 charQueue.take();
253 }
254 }
255 };
256 consumer.start();
257
258 Thread producer = new Thread() {
259 @Override
260 public void run() {
261 for (char c : testString.toCharArray()) {
262 charQueue.put(c);
263 }
264 charQueue.put(lastElement);
265 charQueue.flushInputBuffer();
266 }
267 };
268 producer.start();
269
270 producer.join();
271 consumer.join();
272
273 assertEquals(testString, sb.toString());
274 }
275
47c79d9f
AM
276 /**
277 * Test the contents returned by {@link BufferedBlockingQueue#iterator()}.
278 *
279 * The test is sequential, because the iterator has no guarantee wrt to its
280 * contents when run concurrently.
281 */
282 @Test
283 public void testIteratorContents() {
284 Deque<Character> expected = new LinkedList<>();
285
286 /* Iterator should be empty initially */
287 assertFalse(charQueue.iterator().hasNext());
288
289 /* Insert the first 50 elements */
290 for (int i = 0; i < 50; i++) {
291 char c = testString.charAt(i);
292 charQueue.put(c);
293 expected.addFirst(c);
294 }
295 LinkedList<Character> actual = new LinkedList<>();
296 Iterators.addAll(actual, charQueue.iterator());
297 assertSameElements(expected, actual);
298
299 /*
300 * Insert more elements, flush the input buffer (should not affect the
301 * iteration).
302 */
303 for (int i = 50; i < 60; i++) {
304 char c = testString.charAt(i);
305 charQueue.put(c);
306 charQueue.flushInputBuffer();
307 expected.addFirst(c);
308 }
309 actual = new LinkedList<>();
310 Iterators.addAll(actual, charQueue.iterator());
311 assertSameElements(expected, actual);
312
313 /* Consume the 30 last elements from the queue */
314 for (int i = 0; i < 30; i++) {
315 charQueue.take();
316 expected.removeLast();
317 }
318 actual = new LinkedList<>();
319 Iterators.addAll(actual, charQueue.iterator());
320 assertSameElements(expected, actual);
321
322 /* Now empty the queue */
323 while (!charQueue.isEmpty()) {
324 charQueue.take();
325 expected.removeLast();
326 }
327 assertFalse(charQueue.iterator().hasNext());
328 }
329
330 /**
331 * Utility method to verify that two collections contain the exact same
332 * elements, not necessarily in the same iteration order.
333 *
334 * {@link Collection#equals} requires the iteration order to be the same,
335 * which we do not want here.
336 *
337 * Using a {@link Set} or {@link Collection#containsAll} is not sufficient
338 * either, because those will throw away duplicate elements.
339 */
340 private static <T> void assertSameElements(Collection<T> c1, Collection<T> c2) {
341 assertEquals(HashMultiset.create(c1), HashMultiset.create(c2));
342 }
343
9d979fda 344 /**
d6e2666b
AM
345 * Test iterating on the queue while a producer and a consumer threads are
346 * using it. The iteration should not affect the elements taken by the
347 * consumer.
9d979fda
MK
348 */
349 @Test
fadd7888 350 public void testConcurrentIteration() {
d6e2666b 351 final BufferedBlockingQueue<String> queue = new BufferedBlockingQueue<>(15, 15);
fadd7888 352 final String poisonPill = "That's all folks!";
9d979fda 353
fadd7888
AM
354 /*
355 * Convert the test's testBuffer into an array of String, one for each
356 * character. There are probably simpler ways of doing this, but it
357 * would not look as impressive.
358 */
359 FluentIterable<Character> fi = FluentIterable.from(Chars.asList(testString.toCharArray()));
360 List<String> strings = fi.transform(Functions.toStringFunction()).toList();
9d979fda 361
fadd7888
AM
362 Iterable<Iterable<String>> results =
363 runConcurrencyTest(queue, strings, poisonPill, 1, 1, 1);
364
365 assertEquals(strings, Iterables.getOnlyElement(results));
366 }
9d979fda 367
fadd7888
AM
368 /**
369 * Run a concurrency test on a {@link BufferedBlockingQueue}, with the
370 * specified number of producer, consumer and observer/iterator threads.
371 *
372 * The returned value represents the elements consumed by each consumer
373 * thread. Thus, if there is one consumer, the top-level {@link Iterable}
374 * will be of size 1, and the inner one should contain all the elements that
375 * were inserted.
376 *
377 * @param queue
378 * The queue to run the test on
379 * @param testBuffer
380 * The data set to insert in the queue. Every producer will
381 * insert one entire set.
382 * @param poisonPill
383 * The "poison pill" to indicate the end. Simply make sure it is
384 * a element of type <T> that is not present in the 'testBuffer'.
385 * @param nbProducerThreads
386 * Number of producer threads. There should be at least 1.
387 * @param nbConsumerThreads
388 * Number of consumer threads. There should be at least 1.
389 * @param nbObserverThreads
390 * Number of observer threads. It should be >= 0.
391 * @return The consumed elements, as seen by each consumer thread.
392 */
393 private static <T> Iterable<Iterable<T>> runConcurrencyTest(final BufferedBlockingQueue<T> queue,
394 final List<T> testBuffer,
395 final @NonNull T poisonPill,
396 int nbProducerThreads,
397 int nbConsumerThreads,
398 int nbObserverThreads) {
399
400 final class ProducerThread implements Runnable {
9d979fda
MK
401 @Override
402 public void run() {
fadd7888
AM
403 for (int i = 0; i < testBuffer.size(); i++) {
404 T elem = testBuffer.get(i);
405 if (elem == null) {
406 // TODO replace with List<@NonNull T> once we can
407 throw new IllegalArgumentException();
408 }
409 queue.put(elem);
9d979fda 410 }
d6e2666b
AM
411 queue.put(poisonPill);
412 queue.flushInputBuffer();
9d979fda 413 }
fadd7888 414 }
9d979fda 415
fadd7888
AM
416 /**
417 * The consumer thread will return the elements it read via its Future.
418 *
419 * Note that if there are multiple consumers, there is no guarantee with
420 * regards the contents of an individual one.
421 */
422 final class ConsumerThread implements Callable<Iterable<T>> {
9d979fda 423 @Override
fadd7888
AM
424 public Iterable<T> call() {
425 List<T> results = new LinkedList<>();
426 T elem = queue.take();
427 while (!elem.equals(poisonPill)) {
428 results.add(elem);
429 elem = queue.take();
9d979fda 430 }
fadd7888 431 return results;
9d979fda 432 }
fadd7888 433 }
9d979fda 434
fadd7888 435 final class ObserverThread implements Runnable {
9d979fda
MK
436 @Override
437 public void run() {
fadd7888
AM
438 for (int i = 0; i < 5; i++) {
439 final Set<T> results = new HashSet<>();
440 for (T input : queue) {
441 /*
442 * Do something with the element so that this iteration
443 * does not get optimized out.
444 */
d6e2666b 445 results.add(input);
9d979fda
MK
446 }
447 }
9d979fda 448 }
fadd7888 449 }
9d979fda 450
fadd7888
AM
451 if (nbProducerThreads < 1 || nbConsumerThreads < 1 || nbObserverThreads < 0) {
452 throw new IllegalArgumentException();
453 }
9d979fda 454
fadd7888
AM
455 final ExecutorService pool = Executors.newFixedThreadPool(
456 nbProducerThreads + nbConsumerThreads + nbObserverThreads);
9d979fda 457
fadd7888
AM
458 /* Consumed elements, per consumer thread */
459 List<Future<Iterable<T>>> consumedElements = new LinkedList<>();
9d979fda 460
fadd7888
AM
461 for (int i = 0; i < nbProducerThreads; i++) {
462 pool.submit(new ProducerThread());
463 }
464 for (int i = 0; i < nbConsumerThreads; i++) {
465 consumedElements.add(pool.submit(new ConsumerThread()));
466 }
467 for (int i = 0; i < nbObserverThreads; i++) {
468 pool.submit(new ObserverThread());
469 }
470
471 List<Iterable<T>> results = new LinkedList<>();
472 try {
473 /* Convert the Future's to the actual return value */
474 for (Future<Iterable<T>> future : consumedElements) {
475 Iterable<T> threadResult = future.get();
476 results.add(threadResult);
477 }
478
479 pool.shutdown();
480 boolean success = pool.awaitTermination(2, TimeUnit.MINUTES);
481 if (!success) {
482 throw new InterruptedException();
483 }
484
485 } catch (ExecutionException | InterruptedException e) {
486 fail(e.getMessage());
487 }
488
489 return results;
490 }
47c79d9f 491
fadd7888 492}
This page took 0.129646 seconds and 5 git commands to generate.