Commit | Line | Data |
---|---|---|
9d979fda MK |
1 | /******************************************************************************* |
2 | * Copyright (c) 2015 Ericsson | |
3 | * | |
4 | * All rights reserved. This program and the accompanying materials are | |
5 | * made available under the terms of the Eclipse Public License v1.0 which | |
6 | * accompanies this distribution, and is available at | |
7 | * http://www.eclipse.org/legal/epl-v10.html | |
8 | * | |
9 | * Contributors: | |
10 | * Matthew Khouzam - Initial API and implementation | |
11 | *******************************************************************************/ | |
12 | ||
13 | package org.eclipse.tracecompass.common.core.tests.collect; | |
14 | ||
9d979fda MK |
15 | import static org.junit.Assert.assertEquals; |
16 | import static org.junit.Assert.assertFalse; | |
17 | import static org.junit.Assert.assertTrue; | |
289a287e | 18 | import static org.junit.Assert.assertNotNull; |
fadd7888 | 19 | import static org.junit.Assert.fail; |
9d979fda | 20 | |
47c79d9f AM |
21 | import java.util.Collection; |
22 | import java.util.Deque; | |
d6e2666b | 23 | import java.util.HashSet; |
9d979fda | 24 | import java.util.LinkedList; |
fadd7888 | 25 | import java.util.List; |
9d979fda | 26 | import java.util.Random; |
d6e2666b | 27 | import java.util.Set; |
9d979fda MK |
28 | import java.util.concurrent.Callable; |
29 | import java.util.concurrent.ExecutionException; | |
30 | import java.util.concurrent.ExecutorService; | |
31 | import java.util.concurrent.Executors; | |
32 | import java.util.concurrent.Future; | |
33 | import java.util.concurrent.TimeUnit; | |
bbadfd0a | 34 | import java.util.stream.Collectors; |
9d979fda | 35 | |
fadd7888 | 36 | import org.eclipse.jdt.annotation.NonNull; |
9d979fda MK |
37 | import org.eclipse.tracecompass.common.core.collect.BufferedBlockingQueue; |
38 | import org.junit.Before; | |
39 | import org.junit.Rule; | |
40 | import org.junit.Test; | |
41 | import org.junit.rules.TestRule; | |
42 | import org.junit.rules.Timeout; | |
43 | ||
47c79d9f | 44 | import com.google.common.collect.HashMultiset; |
fadd7888 | 45 | import com.google.common.collect.Iterables; |
47c79d9f | 46 | import com.google.common.collect.Iterators; |
fadd7888 | 47 | import com.google.common.primitives.Chars; |
47c79d9f | 48 | |
9d979fda MK |
49 | /** |
50 | * Test suite for the {@link BufferedBlockingQueue} | |
51 | */ | |
52 | public class BufferedBlockingQueueTest { | |
53 | ||
54 | /** Timeout the tests after 2 minutes */ | |
55 | @Rule | |
d291a715 | 56 | public TestRule timeoutRule = new Timeout(2, TimeUnit.MINUTES); |
9d979fda | 57 | |
47c79d9f AM |
58 | private static final String testString = "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" + |
59 | "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" + | |
60 | "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz"; | |
61 | ||
9d979fda MK |
62 | private BufferedBlockingQueue<Character> charQueue; |
63 | ||
64 | /** | |
65 | * Test setup | |
66 | */ | |
67 | @Before | |
68 | public void init() { | |
69 | charQueue = new BufferedBlockingQueue<>(15, 15); | |
70 | } | |
71 | ||
289a287e MK |
72 | /** |
73 | * Test with chunkSize = 1 and buffer queueSize = 1 | |
74 | */ | |
75 | public void testValidConstructor1() { | |
76 | assertNotNull(new BufferedBlockingQueue<>(1, 1)); | |
77 | } | |
78 | ||
79 | /** | |
80 | * Test with chunkSize = 0 and buffer queueSize = 0 | |
81 | * <p> | |
82 | * Should fail with an {@link IllegalArgumentException} | |
83 | */ | |
84 | @Test(expected = IllegalArgumentException.class) | |
85 | public void testInvalidConstructor1() { | |
86 | assertNotNull(new BufferedBlockingQueue<>(0, 0)); | |
87 | } | |
88 | ||
89 | /** | |
90 | * Test with chunkSize = 0 and buffer queueSize = 1 | |
91 | * <p> | |
92 | * Should fail with an {@link IllegalArgumentException} | |
93 | */ | |
94 | @Test(expected = IllegalArgumentException.class) | |
95 | public void testInvalidConstructor2() { | |
96 | assertNotNull(new BufferedBlockingQueue<>(1, 0)); | |
97 | } | |
98 | ||
99 | /** | |
100 | * Test with chunkSize = 1 and buffer queueSize = 0 | |
101 | */ | |
102 | public void testInvalidConstructor3() { | |
103 | assertNotNull(new BufferedBlockingQueue<>(0, 1)); | |
104 | } | |
105 | ||
106 | /** | |
107 | * Test with chunkSize = 1 and buffer queueSize =-1 | |
108 | * <p> | |
109 | * Should fail with an {@link IllegalArgumentException} | |
110 | */ | |
111 | @Test(expected = IllegalArgumentException.class) | |
112 | public void testInvalidConstructor4() { | |
113 | assertNotNull(new BufferedBlockingQueue<>(-1, 1)); | |
114 | } | |
115 | ||
116 | /** | |
117 | * Test with chunkSize = -1 and buffer queueSize = 1 | |
118 | * <p> | |
119 | * Should fail with an {@link IllegalArgumentException} | |
120 | */ | |
121 | @Test(expected = IllegalArgumentException.class) | |
122 | public void testInvalidConstructor5() { | |
123 | assertNotNull(new BufferedBlockingQueue<>(1, -1)); | |
124 | } | |
125 | ||
9d979fda MK |
126 | /** |
127 | * Test inserting one element and removing it. | |
128 | */ | |
129 | @Test | |
130 | public void testSingleInsertion() { | |
131 | Character element = 'x'; | |
132 | charQueue.put(element); | |
133 | charQueue.flushInputBuffer(); | |
134 | ||
135 | Character out = charQueue.take(); | |
136 | assertEquals(element, out); | |
137 | } | |
138 | ||
139 | /** | |
140 | * Test insertion of elements that fit into the input buffer. | |
141 | */ | |
142 | @Test | |
143 | public void testSimpleInsertion() { | |
144 | String string = "Hello world!"; | |
145 | for (char elem : string.toCharArray()) { | |
146 | charQueue.put(elem); | |
147 | } | |
148 | charQueue.flushInputBuffer(); | |
149 | ||
150 | StringBuilder sb = new StringBuilder(); | |
151 | while (!charQueue.isEmpty()) { | |
152 | sb.append(charQueue.take()); | |
153 | } | |
154 | assertEquals(string, sb.toString()); | |
155 | } | |
156 | ||
157 | /** | |
158 | * Test insertion of elements that will require more than one input buffer. | |
159 | */ | |
160 | @Test | |
161 | public void testLargeInsertion() { | |
162 | String string = testString.substring(0, 222); | |
163 | for (char elem : string.toCharArray()) { | |
164 | charQueue.put(elem); | |
165 | } | |
166 | charQueue.flushInputBuffer(); | |
167 | ||
168 | StringBuilder sb = new StringBuilder(); | |
169 | while (!charQueue.isEmpty()) { | |
170 | sb.append(charQueue.take()); | |
171 | } | |
172 | assertEquals(string, sb.toString()); | |
173 | } | |
174 | ||
175 | /** | |
176 | * Test the state of the {@link BufferedBlockingQueue#isEmpty()} method at | |
177 | * various moments. | |
178 | */ | |
179 | @Test | |
180 | public void testIsEmpty() { | |
181 | BufferedBlockingQueue<String> stringQueue = new BufferedBlockingQueue<>(15, 15); | |
182 | assertTrue(stringQueue.isEmpty()); | |
183 | ||
184 | stringQueue.put("Hello"); | |
185 | assertFalse(stringQueue.isEmpty()); | |
186 | ||
187 | stringQueue.flushInputBuffer(); | |
188 | assertFalse(stringQueue.isEmpty()); | |
189 | ||
190 | stringQueue.flushInputBuffer(); | |
191 | assertFalse(stringQueue.isEmpty()); | |
192 | ||
193 | stringQueue.flushInputBuffer(); | |
194 | stringQueue.take(); | |
195 | assertTrue(stringQueue.isEmpty()); | |
196 | ||
197 | stringQueue.flushInputBuffer(); | |
198 | assertTrue(stringQueue.isEmpty()); | |
199 | } | |
200 | ||
201 | /** | |
202 | * Write random data in and read it, several times. | |
203 | */ | |
204 | @Test | |
205 | public void testOddInsertions() { | |
206 | BufferedBlockingQueue<Object> objectQueue = new BufferedBlockingQueue<>(15, 15); | |
207 | LinkedList<Object> expectedValues = new LinkedList<>(); | |
208 | Random rnd = new Random(); | |
209 | rnd.setSeed(123); | |
210 | ||
211 | for (int i = 0; i < 10; i++) { | |
212 | /* | |
213 | * The queue's total size is 225 (15x15). We must make sure to not | |
214 | * fill it up here! | |
215 | */ | |
216 | for (int j = 0; j < 50; j++) { | |
0e4f957e AM |
217 | Integer testInt = rnd.nextInt(); |
218 | Long testLong = rnd.nextLong(); | |
219 | Double testDouble = rnd.nextDouble(); | |
220 | Double testGaussian = rnd.nextGaussian(); | |
9d979fda MK |
221 | |
222 | expectedValues.add(testInt); | |
223 | expectedValues.add(testLong); | |
224 | expectedValues.add(testDouble); | |
225 | expectedValues.add(testGaussian); | |
226 | objectQueue.put(testInt); | |
227 | objectQueue.put(testLong); | |
228 | objectQueue.put(testDouble); | |
229 | objectQueue.put(testGaussian); | |
230 | } | |
231 | objectQueue.flushInputBuffer(); | |
232 | ||
233 | while (!expectedValues.isEmpty()) { | |
234 | Object expected = expectedValues.removeFirst(); | |
235 | Object actual = objectQueue.take(); | |
236 | assertEquals(expected, actual); | |
237 | } | |
238 | } | |
239 | } | |
240 | ||
241 | /** | |
242 | * Read with a producer and a consumer | |
243 | * | |
244 | * @throws InterruptedException | |
245 | * The test was interrupted | |
246 | */ | |
247 | @Test | |
248 | public void testMultiThread() throws InterruptedException { | |
249 | /* A character not found in the test string */ | |
250 | final Character lastElement = '%'; | |
251 | ||
252 | Thread producer = new Thread() { | |
253 | @Override | |
254 | public void run() { | |
255 | for (char c : testString.toCharArray()) { | |
256 | charQueue.put(c); | |
257 | } | |
258 | charQueue.put(lastElement); | |
259 | charQueue.flushInputBuffer(); | |
260 | } | |
261 | }; | |
262 | producer.start(); | |
263 | ||
264 | Thread consumer = new Thread() { | |
265 | @Override | |
266 | public void run() { | |
267 | Character s = charQueue.take(); | |
268 | while (!s.equals(lastElement)) { | |
269 | s = charQueue.take(); | |
270 | } | |
271 | } | |
272 | }; | |
273 | consumer.start(); | |
274 | ||
275 | consumer.join(); | |
276 | producer.join(); | |
277 | } | |
278 | ||
49698f83 BH |
279 | /** |
280 | * Read with a producer and a consumer using | |
281 | * {@link BufferedBlockingQueue#blockingPeek()}. | |
282 | * | |
283 | * @throws InterruptedException | |
284 | * The test was interrupted | |
285 | */ | |
286 | @Test | |
287 | public void testBlockingPeek() throws InterruptedException { | |
288 | /* A character not found in the test string */ | |
289 | final Character lastElement = '%'; | |
290 | ||
291 | final StringBuilder sb = new StringBuilder(); | |
292 | ||
293 | Thread consumer = new Thread() { | |
294 | @Override | |
295 | public void run() { | |
296 | boolean isFinished = false; | |
297 | while (!isFinished) { | |
298 | // Read last element without removing it | |
299 | Character s = charQueue.blockingPeek(); | |
300 | isFinished = s.equals(lastElement); | |
301 | if (!isFinished) { | |
302 | sb.append(s); | |
303 | } | |
304 | // Remove element | |
305 | charQueue.take(); | |
306 | } | |
307 | } | |
308 | }; | |
309 | consumer.start(); | |
310 | ||
311 | Thread producer = new Thread() { | |
312 | @Override | |
313 | public void run() { | |
314 | for (char c : testString.toCharArray()) { | |
315 | charQueue.put(c); | |
316 | } | |
317 | charQueue.put(lastElement); | |
318 | charQueue.flushInputBuffer(); | |
319 | } | |
320 | }; | |
321 | producer.start(); | |
322 | ||
323 | producer.join(); | |
324 | consumer.join(); | |
325 | ||
326 | assertEquals(testString, sb.toString()); | |
327 | } | |
328 | ||
47c79d9f AM |
329 | /** |
330 | * Test the contents returned by {@link BufferedBlockingQueue#iterator()}. | |
331 | * | |
332 | * The test is sequential, because the iterator has no guarantee wrt to its | |
333 | * contents when run concurrently. | |
334 | */ | |
335 | @Test | |
336 | public void testIteratorContents() { | |
337 | Deque<Character> expected = new LinkedList<>(); | |
338 | ||
339 | /* Iterator should be empty initially */ | |
340 | assertFalse(charQueue.iterator().hasNext()); | |
341 | ||
342 | /* Insert the first 50 elements */ | |
343 | for (int i = 0; i < 50; i++) { | |
344 | char c = testString.charAt(i); | |
345 | charQueue.put(c); | |
346 | expected.addFirst(c); | |
347 | } | |
348 | LinkedList<Character> actual = new LinkedList<>(); | |
349 | Iterators.addAll(actual, charQueue.iterator()); | |
350 | assertSameElements(expected, actual); | |
351 | ||
352 | /* | |
353 | * Insert more elements, flush the input buffer (should not affect the | |
354 | * iteration). | |
355 | */ | |
356 | for (int i = 50; i < 60; i++) { | |
357 | char c = testString.charAt(i); | |
358 | charQueue.put(c); | |
359 | charQueue.flushInputBuffer(); | |
360 | expected.addFirst(c); | |
361 | } | |
362 | actual = new LinkedList<>(); | |
363 | Iterators.addAll(actual, charQueue.iterator()); | |
364 | assertSameElements(expected, actual); | |
365 | ||
366 | /* Consume the 30 last elements from the queue */ | |
367 | for (int i = 0; i < 30; i++) { | |
368 | charQueue.take(); | |
369 | expected.removeLast(); | |
370 | } | |
371 | actual = new LinkedList<>(); | |
372 | Iterators.addAll(actual, charQueue.iterator()); | |
373 | assertSameElements(expected, actual); | |
374 | ||
375 | /* Now empty the queue */ | |
376 | while (!charQueue.isEmpty()) { | |
377 | charQueue.take(); | |
378 | expected.removeLast(); | |
379 | } | |
380 | assertFalse(charQueue.iterator().hasNext()); | |
381 | } | |
382 | ||
383 | /** | |
384 | * Utility method to verify that two collections contain the exact same | |
385 | * elements, not necessarily in the same iteration order. | |
386 | * | |
387 | * {@link Collection#equals} requires the iteration order to be the same, | |
388 | * which we do not want here. | |
389 | * | |
390 | * Using a {@link Set} or {@link Collection#containsAll} is not sufficient | |
391 | * either, because those will throw away duplicate elements. | |
392 | */ | |
393 | private static <T> void assertSameElements(Collection<T> c1, Collection<T> c2) { | |
394 | assertEquals(HashMultiset.create(c1), HashMultiset.create(c2)); | |
395 | } | |
396 | ||
9d979fda | 397 | /** |
d6e2666b AM |
398 | * Test iterating on the queue while a producer and a consumer threads are |
399 | * using it. The iteration should not affect the elements taken by the | |
400 | * consumer. | |
9d979fda MK |
401 | */ |
402 | @Test | |
fadd7888 | 403 | public void testConcurrentIteration() { |
5b3fe39a | 404 | final BufferedBlockingQueue<@NonNull String> queue = new BufferedBlockingQueue<>(15, 15); |
fadd7888 | 405 | final String poisonPill = "That's all folks!"; |
9d979fda | 406 | |
fadd7888 AM |
407 | /* |
408 | * Convert the test's testBuffer into an array of String, one for each | |
bbadfd0a | 409 | * character. |
fadd7888 | 410 | */ |
bbadfd0a AM |
411 | List<String> strings = Chars.asList(testString.toCharArray()).stream() |
412 | .map(Object::toString) | |
413 | .collect(Collectors.toList()); | |
9d979fda | 414 | |
fadd7888 AM |
415 | Iterable<Iterable<String>> results = |
416 | runConcurrencyTest(queue, strings, poisonPill, 1, 1, 1); | |
417 | ||
418 | assertEquals(strings, Iterables.getOnlyElement(results)); | |
419 | } | |
9d979fda | 420 | |
fadd7888 AM |
421 | /** |
422 | * Run a concurrency test on a {@link BufferedBlockingQueue}, with the | |
423 | * specified number of producer, consumer and observer/iterator threads. | |
424 | * | |
425 | * The returned value represents the elements consumed by each consumer | |
426 | * thread. Thus, if there is one consumer, the top-level {@link Iterable} | |
427 | * will be of size 1, and the inner one should contain all the elements that | |
428 | * were inserted. | |
429 | * | |
430 | * @param queue | |
431 | * The queue to run the test on | |
432 | * @param testBuffer | |
433 | * The data set to insert in the queue. Every producer will | |
434 | * insert one entire set. | |
435 | * @param poisonPill | |
436 | * The "poison pill" to indicate the end. Simply make sure it is | |
437 | * a element of type <T> that is not present in the 'testBuffer'. | |
438 | * @param nbProducerThreads | |
439 | * Number of producer threads. There should be at least 1. | |
440 | * @param nbConsumerThreads | |
441 | * Number of consumer threads. There should be at least 1. | |
442 | * @param nbObserverThreads | |
443 | * Number of observer threads. It should be >= 0. | |
444 | * @return The consumed elements, as seen by each consumer thread. | |
445 | */ | |
5b3fe39a | 446 | private static <T> Iterable<Iterable<T>> runConcurrencyTest(final BufferedBlockingQueue<@NonNull T> queue, |
fadd7888 AM |
447 | final List<T> testBuffer, |
448 | final @NonNull T poisonPill, | |
449 | int nbProducerThreads, | |
450 | int nbConsumerThreads, | |
451 | int nbObserverThreads) { | |
452 | ||
453 | final class ProducerThread implements Runnable { | |
9d979fda MK |
454 | @Override |
455 | public void run() { | |
fadd7888 AM |
456 | for (int i = 0; i < testBuffer.size(); i++) { |
457 | T elem = testBuffer.get(i); | |
458 | if (elem == null) { | |
459 | // TODO replace with List<@NonNull T> once we can | |
460 | throw new IllegalArgumentException(); | |
461 | } | |
462 | queue.put(elem); | |
9d979fda | 463 | } |
d6e2666b AM |
464 | queue.put(poisonPill); |
465 | queue.flushInputBuffer(); | |
9d979fda | 466 | } |
fadd7888 | 467 | } |
9d979fda | 468 | |
fadd7888 AM |
469 | /** |
470 | * The consumer thread will return the elements it read via its Future. | |
471 | * | |
472 | * Note that if there are multiple consumers, there is no guarantee with | |
473 | * regards the contents of an individual one. | |
474 | */ | |
475 | final class ConsumerThread implements Callable<Iterable<T>> { | |
9d979fda | 476 | @Override |
fadd7888 AM |
477 | public Iterable<T> call() { |
478 | List<T> results = new LinkedList<>(); | |
479 | T elem = queue.take(); | |
480 | while (!elem.equals(poisonPill)) { | |
481 | results.add(elem); | |
482 | elem = queue.take(); | |
9d979fda | 483 | } |
fadd7888 | 484 | return results; |
9d979fda | 485 | } |
fadd7888 | 486 | } |
9d979fda | 487 | |
fadd7888 | 488 | final class ObserverThread implements Runnable { |
9d979fda MK |
489 | @Override |
490 | public void run() { | |
fadd7888 AM |
491 | for (int i = 0; i < 5; i++) { |
492 | final Set<T> results = new HashSet<>(); | |
493 | for (T input : queue) { | |
494 | /* | |
495 | * Do something with the element so that this iteration | |
496 | * does not get optimized out. | |
497 | */ | |
d6e2666b | 498 | results.add(input); |
9d979fda MK |
499 | } |
500 | } | |
9d979fda | 501 | } |
fadd7888 | 502 | } |
9d979fda | 503 | |
fadd7888 AM |
504 | if (nbProducerThreads < 1 || nbConsumerThreads < 1 || nbObserverThreads < 0) { |
505 | throw new IllegalArgumentException(); | |
506 | } | |
9d979fda | 507 | |
fadd7888 AM |
508 | final ExecutorService pool = Executors.newFixedThreadPool( |
509 | nbProducerThreads + nbConsumerThreads + nbObserverThreads); | |
9d979fda | 510 | |
fadd7888 AM |
511 | /* Consumed elements, per consumer thread */ |
512 | List<Future<Iterable<T>>> consumedElements = new LinkedList<>(); | |
9d979fda | 513 | |
fadd7888 AM |
514 | for (int i = 0; i < nbProducerThreads; i++) { |
515 | pool.submit(new ProducerThread()); | |
516 | } | |
517 | for (int i = 0; i < nbConsumerThreads; i++) { | |
518 | consumedElements.add(pool.submit(new ConsumerThread())); | |
519 | } | |
520 | for (int i = 0; i < nbObserverThreads; i++) { | |
521 | pool.submit(new ObserverThread()); | |
522 | } | |
523 | ||
524 | List<Iterable<T>> results = new LinkedList<>(); | |
525 | try { | |
526 | /* Convert the Future's to the actual return value */ | |
527 | for (Future<Iterable<T>> future : consumedElements) { | |
528 | Iterable<T> threadResult = future.get(); | |
529 | results.add(threadResult); | |
530 | } | |
531 | ||
532 | pool.shutdown(); | |
533 | boolean success = pool.awaitTermination(2, TimeUnit.MINUTES); | |
534 | if (!success) { | |
535 | throw new InterruptedException(); | |
536 | } | |
537 | ||
538 | } catch (ExecutionException | InterruptedException e) { | |
539 | fail(e.getMessage()); | |
540 | } | |
541 | ||
542 | return results; | |
543 | } | |
47c79d9f | 544 | |
fadd7888 | 545 | } |