Commit | Line | Data |
---|---|---|
9d979fda MK |
1 | /******************************************************************************* |
2 | * Copyright (c) 2015 Ericsson | |
3 | * | |
4 | * All rights reserved. This program and the accompanying materials are | |
5 | * made available under the terms of the Eclipse Public License v1.0 which | |
6 | * accompanies this distribution, and is available at | |
7 | * http://www.eclipse.org/legal/epl-v10.html | |
8 | * | |
9 | * Contributors: | |
10 | * Matthew Khouzam - Initial API and implementation | |
11 | *******************************************************************************/ | |
12 | ||
13 | package org.eclipse.tracecompass.common.core.tests.collect; | |
14 | ||
9d979fda MK |
15 | import static org.junit.Assert.assertEquals; |
16 | import static org.junit.Assert.assertFalse; | |
17 | import static org.junit.Assert.assertTrue; | |
fadd7888 | 18 | import static org.junit.Assert.fail; |
9d979fda | 19 | |
47c79d9f AM |
20 | import java.util.Collection; |
21 | import java.util.Deque; | |
d6e2666b | 22 | import java.util.HashSet; |
9d979fda | 23 | import java.util.LinkedList; |
fadd7888 | 24 | import java.util.List; |
9d979fda | 25 | import java.util.Random; |
d6e2666b | 26 | import java.util.Set; |
9d979fda MK |
27 | import java.util.concurrent.Callable; |
28 | import java.util.concurrent.ExecutionException; | |
29 | import java.util.concurrent.ExecutorService; | |
30 | import java.util.concurrent.Executors; | |
31 | import java.util.concurrent.Future; | |
32 | import java.util.concurrent.TimeUnit; | |
bbadfd0a | 33 | import java.util.stream.Collectors; |
9d979fda | 34 | |
fadd7888 | 35 | import org.eclipse.jdt.annotation.NonNull; |
9d979fda MK |
36 | import org.eclipse.tracecompass.common.core.NonNullUtils; |
37 | import org.eclipse.tracecompass.common.core.collect.BufferedBlockingQueue; | |
38 | import org.junit.Before; | |
39 | import org.junit.Rule; | |
40 | import org.junit.Test; | |
41 | import org.junit.rules.TestRule; | |
42 | import org.junit.rules.Timeout; | |
43 | ||
47c79d9f | 44 | import com.google.common.collect.HashMultiset; |
fadd7888 | 45 | import com.google.common.collect.Iterables; |
47c79d9f | 46 | import com.google.common.collect.Iterators; |
fadd7888 | 47 | import com.google.common.primitives.Chars; |
47c79d9f | 48 | |
9d979fda MK |
49 | /** |
50 | * Test suite for the {@link BufferedBlockingQueue} | |
51 | */ | |
52 | public class BufferedBlockingQueueTest { | |
53 | ||
54 | /** Timeout the tests after 2 minutes */ | |
55 | @Rule | |
d291a715 | 56 | public TestRule timeoutRule = new Timeout(2, TimeUnit.MINUTES); |
9d979fda | 57 | |
47c79d9f AM |
58 | private static final String testString = "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" + |
59 | "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" + | |
60 | "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz"; | |
61 | ||
9d979fda MK |
62 | private BufferedBlockingQueue<Character> charQueue; |
63 | ||
64 | /** | |
65 | * Test setup | |
66 | */ | |
67 | @Before | |
68 | public void init() { | |
69 | charQueue = new BufferedBlockingQueue<>(15, 15); | |
70 | } | |
71 | ||
72 | /** | |
73 | * Test inserting one element and removing it. | |
74 | */ | |
75 | @Test | |
76 | public void testSingleInsertion() { | |
77 | Character element = 'x'; | |
78 | charQueue.put(element); | |
79 | charQueue.flushInputBuffer(); | |
80 | ||
81 | Character out = charQueue.take(); | |
82 | assertEquals(element, out); | |
83 | } | |
84 | ||
85 | /** | |
86 | * Test insertion of elements that fit into the input buffer. | |
87 | */ | |
88 | @Test | |
89 | public void testSimpleInsertion() { | |
90 | String string = "Hello world!"; | |
91 | for (char elem : string.toCharArray()) { | |
92 | charQueue.put(elem); | |
93 | } | |
94 | charQueue.flushInputBuffer(); | |
95 | ||
96 | StringBuilder sb = new StringBuilder(); | |
97 | while (!charQueue.isEmpty()) { | |
98 | sb.append(charQueue.take()); | |
99 | } | |
100 | assertEquals(string, sb.toString()); | |
101 | } | |
102 | ||
103 | /** | |
104 | * Test insertion of elements that will require more than one input buffer. | |
105 | */ | |
106 | @Test | |
107 | public void testLargeInsertion() { | |
108 | String string = testString.substring(0, 222); | |
109 | for (char elem : string.toCharArray()) { | |
110 | charQueue.put(elem); | |
111 | } | |
112 | charQueue.flushInputBuffer(); | |
113 | ||
114 | StringBuilder sb = new StringBuilder(); | |
115 | while (!charQueue.isEmpty()) { | |
116 | sb.append(charQueue.take()); | |
117 | } | |
118 | assertEquals(string, sb.toString()); | |
119 | } | |
120 | ||
121 | /** | |
122 | * Test the state of the {@link BufferedBlockingQueue#isEmpty()} method at | |
123 | * various moments. | |
124 | */ | |
125 | @Test | |
126 | public void testIsEmpty() { | |
127 | BufferedBlockingQueue<String> stringQueue = new BufferedBlockingQueue<>(15, 15); | |
128 | assertTrue(stringQueue.isEmpty()); | |
129 | ||
130 | stringQueue.put("Hello"); | |
131 | assertFalse(stringQueue.isEmpty()); | |
132 | ||
133 | stringQueue.flushInputBuffer(); | |
134 | assertFalse(stringQueue.isEmpty()); | |
135 | ||
136 | stringQueue.flushInputBuffer(); | |
137 | assertFalse(stringQueue.isEmpty()); | |
138 | ||
139 | stringQueue.flushInputBuffer(); | |
140 | stringQueue.take(); | |
141 | assertTrue(stringQueue.isEmpty()); | |
142 | ||
143 | stringQueue.flushInputBuffer(); | |
144 | assertTrue(stringQueue.isEmpty()); | |
145 | } | |
146 | ||
147 | /** | |
148 | * Write random data in and read it, several times. | |
149 | */ | |
150 | @Test | |
151 | public void testOddInsertions() { | |
152 | BufferedBlockingQueue<Object> objectQueue = new BufferedBlockingQueue<>(15, 15); | |
153 | LinkedList<Object> expectedValues = new LinkedList<>(); | |
154 | Random rnd = new Random(); | |
155 | rnd.setSeed(123); | |
156 | ||
157 | for (int i = 0; i < 10; i++) { | |
158 | /* | |
159 | * The queue's total size is 225 (15x15). We must make sure to not | |
160 | * fill it up here! | |
161 | */ | |
162 | for (int j = 0; j < 50; j++) { | |
163 | Integer testInt = NonNullUtils.checkNotNull(rnd.nextInt()); | |
164 | Long testLong = NonNullUtils.checkNotNull(rnd.nextLong()); | |
165 | Double testDouble = NonNullUtils.checkNotNull(rnd.nextDouble()); | |
166 | Double testGaussian = NonNullUtils.checkNotNull(rnd.nextGaussian()); | |
167 | ||
168 | expectedValues.add(testInt); | |
169 | expectedValues.add(testLong); | |
170 | expectedValues.add(testDouble); | |
171 | expectedValues.add(testGaussian); | |
172 | objectQueue.put(testInt); | |
173 | objectQueue.put(testLong); | |
174 | objectQueue.put(testDouble); | |
175 | objectQueue.put(testGaussian); | |
176 | } | |
177 | objectQueue.flushInputBuffer(); | |
178 | ||
179 | while (!expectedValues.isEmpty()) { | |
180 | Object expected = expectedValues.removeFirst(); | |
181 | Object actual = objectQueue.take(); | |
182 | assertEquals(expected, actual); | |
183 | } | |
184 | } | |
185 | } | |
186 | ||
187 | /** | |
188 | * Read with a producer and a consumer | |
189 | * | |
190 | * @throws InterruptedException | |
191 | * The test was interrupted | |
192 | */ | |
193 | @Test | |
194 | public void testMultiThread() throws InterruptedException { | |
195 | /* A character not found in the test string */ | |
196 | final Character lastElement = '%'; | |
197 | ||
198 | Thread producer = new Thread() { | |
199 | @Override | |
200 | public void run() { | |
201 | for (char c : testString.toCharArray()) { | |
202 | charQueue.put(c); | |
203 | } | |
204 | charQueue.put(lastElement); | |
205 | charQueue.flushInputBuffer(); | |
206 | } | |
207 | }; | |
208 | producer.start(); | |
209 | ||
210 | Thread consumer = new Thread() { | |
211 | @Override | |
212 | public void run() { | |
213 | Character s = charQueue.take(); | |
214 | while (!s.equals(lastElement)) { | |
215 | s = charQueue.take(); | |
216 | } | |
217 | } | |
218 | }; | |
219 | consumer.start(); | |
220 | ||
221 | consumer.join(); | |
222 | producer.join(); | |
223 | } | |
224 | ||
49698f83 BH |
225 | /** |
226 | * Read with a producer and a consumer using | |
227 | * {@link BufferedBlockingQueue#blockingPeek()}. | |
228 | * | |
229 | * @throws InterruptedException | |
230 | * The test was interrupted | |
231 | */ | |
232 | @Test | |
233 | public void testBlockingPeek() throws InterruptedException { | |
234 | /* A character not found in the test string */ | |
235 | final Character lastElement = '%'; | |
236 | ||
237 | final StringBuilder sb = new StringBuilder(); | |
238 | ||
239 | Thread consumer = new Thread() { | |
240 | @Override | |
241 | public void run() { | |
242 | boolean isFinished = false; | |
243 | while (!isFinished) { | |
244 | // Read last element without removing it | |
245 | Character s = charQueue.blockingPeek(); | |
246 | isFinished = s.equals(lastElement); | |
247 | if (!isFinished) { | |
248 | sb.append(s); | |
249 | } | |
250 | // Remove element | |
251 | charQueue.take(); | |
252 | } | |
253 | } | |
254 | }; | |
255 | consumer.start(); | |
256 | ||
257 | Thread producer = new Thread() { | |
258 | @Override | |
259 | public void run() { | |
260 | for (char c : testString.toCharArray()) { | |
261 | charQueue.put(c); | |
262 | } | |
263 | charQueue.put(lastElement); | |
264 | charQueue.flushInputBuffer(); | |
265 | } | |
266 | }; | |
267 | producer.start(); | |
268 | ||
269 | producer.join(); | |
270 | consumer.join(); | |
271 | ||
272 | assertEquals(testString, sb.toString()); | |
273 | } | |
274 | ||
47c79d9f AM |
275 | /** |
276 | * Test the contents returned by {@link BufferedBlockingQueue#iterator()}. | |
277 | * | |
278 | * The test is sequential, because the iterator has no guarantee wrt to its | |
279 | * contents when run concurrently. | |
280 | */ | |
281 | @Test | |
282 | public void testIteratorContents() { | |
283 | Deque<Character> expected = new LinkedList<>(); | |
284 | ||
285 | /* Iterator should be empty initially */ | |
286 | assertFalse(charQueue.iterator().hasNext()); | |
287 | ||
288 | /* Insert the first 50 elements */ | |
289 | for (int i = 0; i < 50; i++) { | |
290 | char c = testString.charAt(i); | |
291 | charQueue.put(c); | |
292 | expected.addFirst(c); | |
293 | } | |
294 | LinkedList<Character> actual = new LinkedList<>(); | |
295 | Iterators.addAll(actual, charQueue.iterator()); | |
296 | assertSameElements(expected, actual); | |
297 | ||
298 | /* | |
299 | * Insert more elements, flush the input buffer (should not affect the | |
300 | * iteration). | |
301 | */ | |
302 | for (int i = 50; i < 60; i++) { | |
303 | char c = testString.charAt(i); | |
304 | charQueue.put(c); | |
305 | charQueue.flushInputBuffer(); | |
306 | expected.addFirst(c); | |
307 | } | |
308 | actual = new LinkedList<>(); | |
309 | Iterators.addAll(actual, charQueue.iterator()); | |
310 | assertSameElements(expected, actual); | |
311 | ||
312 | /* Consume the 30 last elements from the queue */ | |
313 | for (int i = 0; i < 30; i++) { | |
314 | charQueue.take(); | |
315 | expected.removeLast(); | |
316 | } | |
317 | actual = new LinkedList<>(); | |
318 | Iterators.addAll(actual, charQueue.iterator()); | |
319 | assertSameElements(expected, actual); | |
320 | ||
321 | /* Now empty the queue */ | |
322 | while (!charQueue.isEmpty()) { | |
323 | charQueue.take(); | |
324 | expected.removeLast(); | |
325 | } | |
326 | assertFalse(charQueue.iterator().hasNext()); | |
327 | } | |
328 | ||
329 | /** | |
330 | * Utility method to verify that two collections contain the exact same | |
331 | * elements, not necessarily in the same iteration order. | |
332 | * | |
333 | * {@link Collection#equals} requires the iteration order to be the same, | |
334 | * which we do not want here. | |
335 | * | |
336 | * Using a {@link Set} or {@link Collection#containsAll} is not sufficient | |
337 | * either, because those will throw away duplicate elements. | |
338 | */ | |
339 | private static <T> void assertSameElements(Collection<T> c1, Collection<T> c2) { | |
340 | assertEquals(HashMultiset.create(c1), HashMultiset.create(c2)); | |
341 | } | |
342 | ||
9d979fda | 343 | /** |
d6e2666b AM |
344 | * Test iterating on the queue while a producer and a consumer threads are |
345 | * using it. The iteration should not affect the elements taken by the | |
346 | * consumer. | |
9d979fda MK |
347 | */ |
348 | @Test | |
fadd7888 | 349 | public void testConcurrentIteration() { |
d6e2666b | 350 | final BufferedBlockingQueue<String> queue = new BufferedBlockingQueue<>(15, 15); |
fadd7888 | 351 | final String poisonPill = "That's all folks!"; |
9d979fda | 352 | |
fadd7888 AM |
353 | /* |
354 | * Convert the test's testBuffer into an array of String, one for each | |
bbadfd0a | 355 | * character. |
fadd7888 | 356 | */ |
bbadfd0a AM |
357 | List<String> strings = Chars.asList(testString.toCharArray()).stream() |
358 | .map(Object::toString) | |
359 | .collect(Collectors.toList()); | |
9d979fda | 360 | |
fadd7888 AM |
361 | Iterable<Iterable<String>> results = |
362 | runConcurrencyTest(queue, strings, poisonPill, 1, 1, 1); | |
363 | ||
364 | assertEquals(strings, Iterables.getOnlyElement(results)); | |
365 | } | |
9d979fda | 366 | |
fadd7888 AM |
367 | /** |
368 | * Run a concurrency test on a {@link BufferedBlockingQueue}, with the | |
369 | * specified number of producer, consumer and observer/iterator threads. | |
370 | * | |
371 | * The returned value represents the elements consumed by each consumer | |
372 | * thread. Thus, if there is one consumer, the top-level {@link Iterable} | |
373 | * will be of size 1, and the inner one should contain all the elements that | |
374 | * were inserted. | |
375 | * | |
376 | * @param queue | |
377 | * The queue to run the test on | |
378 | * @param testBuffer | |
379 | * The data set to insert in the queue. Every producer will | |
380 | * insert one entire set. | |
381 | * @param poisonPill | |
382 | * The "poison pill" to indicate the end. Simply make sure it is | |
383 | * a element of type <T> that is not present in the 'testBuffer'. | |
384 | * @param nbProducerThreads | |
385 | * Number of producer threads. There should be at least 1. | |
386 | * @param nbConsumerThreads | |
387 | * Number of consumer threads. There should be at least 1. | |
388 | * @param nbObserverThreads | |
389 | * Number of observer threads. It should be >= 0. | |
390 | * @return The consumed elements, as seen by each consumer thread. | |
391 | */ | |
392 | private static <T> Iterable<Iterable<T>> runConcurrencyTest(final BufferedBlockingQueue<T> queue, | |
393 | final List<T> testBuffer, | |
394 | final @NonNull T poisonPill, | |
395 | int nbProducerThreads, | |
396 | int nbConsumerThreads, | |
397 | int nbObserverThreads) { | |
398 | ||
399 | final class ProducerThread implements Runnable { | |
9d979fda MK |
400 | @Override |
401 | public void run() { | |
fadd7888 AM |
402 | for (int i = 0; i < testBuffer.size(); i++) { |
403 | T elem = testBuffer.get(i); | |
404 | if (elem == null) { | |
405 | // TODO replace with List<@NonNull T> once we can | |
406 | throw new IllegalArgumentException(); | |
407 | } | |
408 | queue.put(elem); | |
9d979fda | 409 | } |
d6e2666b AM |
410 | queue.put(poisonPill); |
411 | queue.flushInputBuffer(); | |
9d979fda | 412 | } |
fadd7888 | 413 | } |
9d979fda | 414 | |
fadd7888 AM |
415 | /** |
416 | * The consumer thread will return the elements it read via its Future. | |
417 | * | |
418 | * Note that if there are multiple consumers, there is no guarantee with | |
419 | * regards the contents of an individual one. | |
420 | */ | |
421 | final class ConsumerThread implements Callable<Iterable<T>> { | |
9d979fda | 422 | @Override |
fadd7888 AM |
423 | public Iterable<T> call() { |
424 | List<T> results = new LinkedList<>(); | |
425 | T elem = queue.take(); | |
426 | while (!elem.equals(poisonPill)) { | |
427 | results.add(elem); | |
428 | elem = queue.take(); | |
9d979fda | 429 | } |
fadd7888 | 430 | return results; |
9d979fda | 431 | } |
fadd7888 | 432 | } |
9d979fda | 433 | |
fadd7888 | 434 | final class ObserverThread implements Runnable { |
9d979fda MK |
435 | @Override |
436 | public void run() { | |
fadd7888 AM |
437 | for (int i = 0; i < 5; i++) { |
438 | final Set<T> results = new HashSet<>(); | |
439 | for (T input : queue) { | |
440 | /* | |
441 | * Do something with the element so that this iteration | |
442 | * does not get optimized out. | |
443 | */ | |
d6e2666b | 444 | results.add(input); |
9d979fda MK |
445 | } |
446 | } | |
9d979fda | 447 | } |
fadd7888 | 448 | } |
9d979fda | 449 | |
fadd7888 AM |
450 | if (nbProducerThreads < 1 || nbConsumerThreads < 1 || nbObserverThreads < 0) { |
451 | throw new IllegalArgumentException(); | |
452 | } | |
9d979fda | 453 | |
fadd7888 AM |
454 | final ExecutorService pool = Executors.newFixedThreadPool( |
455 | nbProducerThreads + nbConsumerThreads + nbObserverThreads); | |
9d979fda | 456 | |
fadd7888 AM |
457 | /* Consumed elements, per consumer thread */ |
458 | List<Future<Iterable<T>>> consumedElements = new LinkedList<>(); | |
9d979fda | 459 | |
fadd7888 AM |
460 | for (int i = 0; i < nbProducerThreads; i++) { |
461 | pool.submit(new ProducerThread()); | |
462 | } | |
463 | for (int i = 0; i < nbConsumerThreads; i++) { | |
464 | consumedElements.add(pool.submit(new ConsumerThread())); | |
465 | } | |
466 | for (int i = 0; i < nbObserverThreads; i++) { | |
467 | pool.submit(new ObserverThread()); | |
468 | } | |
469 | ||
470 | List<Iterable<T>> results = new LinkedList<>(); | |
471 | try { | |
472 | /* Convert the Future's to the actual return value */ | |
473 | for (Future<Iterable<T>> future : consumedElements) { | |
474 | Iterable<T> threadResult = future.get(); | |
475 | results.add(threadResult); | |
476 | } | |
477 | ||
478 | pool.shutdown(); | |
479 | boolean success = pool.awaitTermination(2, TimeUnit.MINUTES); | |
480 | if (!success) { | |
481 | throw new InterruptedException(); | |
482 | } | |
483 | ||
484 | } catch (ExecutionException | InterruptedException e) { | |
485 | fail(e.getMessage()); | |
486 | } | |
487 | ||
488 | return results; | |
489 | } | |
47c79d9f | 490 | |
fadd7888 | 491 | } |