Commit | Line | Data |
---|---|---|
9d979fda MK |
1 | /******************************************************************************* |
2 | * Copyright (c) 2015 Ericsson | |
3 | * | |
4 | * All rights reserved. This program and the accompanying materials are | |
5 | * made available under the terms of the Eclipse Public License v1.0 which | |
6 | * accompanies this distribution, and is available at | |
7 | * http://www.eclipse.org/legal/epl-v10.html | |
8 | * | |
9 | * Contributors: | |
10 | * Matthew Khouzam - Initial API and implementation | |
11 | *******************************************************************************/ | |
12 | ||
13 | package org.eclipse.tracecompass.common.core.tests.collect; | |
14 | ||
9d979fda MK |
15 | import static org.junit.Assert.assertEquals; |
16 | import static org.junit.Assert.assertFalse; | |
17 | import static org.junit.Assert.assertTrue; | |
fadd7888 | 18 | import static org.junit.Assert.fail; |
9d979fda | 19 | |
47c79d9f AM |
20 | import java.util.Collection; |
21 | import java.util.Deque; | |
d6e2666b | 22 | import java.util.HashSet; |
9d979fda | 23 | import java.util.LinkedList; |
fadd7888 | 24 | import java.util.List; |
9d979fda | 25 | import java.util.Random; |
d6e2666b | 26 | import java.util.Set; |
9d979fda MK |
27 | import java.util.concurrent.Callable; |
28 | import java.util.concurrent.ExecutionException; | |
29 | import java.util.concurrent.ExecutorService; | |
30 | import java.util.concurrent.Executors; | |
31 | import java.util.concurrent.Future; | |
32 | import java.util.concurrent.TimeUnit; | |
bbadfd0a | 33 | import java.util.stream.Collectors; |
9d979fda | 34 | |
fadd7888 | 35 | import org.eclipse.jdt.annotation.NonNull; |
9d979fda MK |
36 | import org.eclipse.tracecompass.common.core.collect.BufferedBlockingQueue; |
37 | import org.junit.Before; | |
38 | import org.junit.Rule; | |
39 | import org.junit.Test; | |
40 | import org.junit.rules.TestRule; | |
41 | import org.junit.rules.Timeout; | |
42 | ||
47c79d9f | 43 | import com.google.common.collect.HashMultiset; |
fadd7888 | 44 | import com.google.common.collect.Iterables; |
47c79d9f | 45 | import com.google.common.collect.Iterators; |
fadd7888 | 46 | import com.google.common.primitives.Chars; |
47c79d9f | 47 | |
9d979fda MK |
48 | /** |
49 | * Test suite for the {@link BufferedBlockingQueue} | |
50 | */ | |
51 | public class BufferedBlockingQueueTest { | |
52 | ||
53 | /** Timeout the tests after 2 minutes */ | |
54 | @Rule | |
d291a715 | 55 | public TestRule timeoutRule = new Timeout(2, TimeUnit.MINUTES); |
9d979fda | 56 | |
47c79d9f AM |
57 | private static final String testString = "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" + |
58 | "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" + | |
59 | "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz"; | |
60 | ||
9d979fda MK |
61 | private BufferedBlockingQueue<Character> charQueue; |
62 | ||
63 | /** | |
64 | * Test setup | |
65 | */ | |
66 | @Before | |
67 | public void init() { | |
68 | charQueue = new BufferedBlockingQueue<>(15, 15); | |
69 | } | |
70 | ||
71 | /** | |
72 | * Test inserting one element and removing it. | |
73 | */ | |
74 | @Test | |
75 | public void testSingleInsertion() { | |
76 | Character element = 'x'; | |
77 | charQueue.put(element); | |
78 | charQueue.flushInputBuffer(); | |
79 | ||
80 | Character out = charQueue.take(); | |
81 | assertEquals(element, out); | |
82 | } | |
83 | ||
84 | /** | |
85 | * Test insertion of elements that fit into the input buffer. | |
86 | */ | |
87 | @Test | |
88 | public void testSimpleInsertion() { | |
89 | String string = "Hello world!"; | |
90 | for (char elem : string.toCharArray()) { | |
91 | charQueue.put(elem); | |
92 | } | |
93 | charQueue.flushInputBuffer(); | |
94 | ||
95 | StringBuilder sb = new StringBuilder(); | |
96 | while (!charQueue.isEmpty()) { | |
97 | sb.append(charQueue.take()); | |
98 | } | |
99 | assertEquals(string, sb.toString()); | |
100 | } | |
101 | ||
102 | /** | |
103 | * Test insertion of elements that will require more than one input buffer. | |
104 | */ | |
105 | @Test | |
106 | public void testLargeInsertion() { | |
107 | String string = testString.substring(0, 222); | |
108 | for (char elem : string.toCharArray()) { | |
109 | charQueue.put(elem); | |
110 | } | |
111 | charQueue.flushInputBuffer(); | |
112 | ||
113 | StringBuilder sb = new StringBuilder(); | |
114 | while (!charQueue.isEmpty()) { | |
115 | sb.append(charQueue.take()); | |
116 | } | |
117 | assertEquals(string, sb.toString()); | |
118 | } | |
119 | ||
120 | /** | |
121 | * Test the state of the {@link BufferedBlockingQueue#isEmpty()} method at | |
122 | * various moments. | |
123 | */ | |
124 | @Test | |
125 | public void testIsEmpty() { | |
126 | BufferedBlockingQueue<String> stringQueue = new BufferedBlockingQueue<>(15, 15); | |
127 | assertTrue(stringQueue.isEmpty()); | |
128 | ||
129 | stringQueue.put("Hello"); | |
130 | assertFalse(stringQueue.isEmpty()); | |
131 | ||
132 | stringQueue.flushInputBuffer(); | |
133 | assertFalse(stringQueue.isEmpty()); | |
134 | ||
135 | stringQueue.flushInputBuffer(); | |
136 | assertFalse(stringQueue.isEmpty()); | |
137 | ||
138 | stringQueue.flushInputBuffer(); | |
139 | stringQueue.take(); | |
140 | assertTrue(stringQueue.isEmpty()); | |
141 | ||
142 | stringQueue.flushInputBuffer(); | |
143 | assertTrue(stringQueue.isEmpty()); | |
144 | } | |
145 | ||
146 | /** | |
147 | * Write random data in and read it, several times. | |
148 | */ | |
149 | @Test | |
150 | public void testOddInsertions() { | |
151 | BufferedBlockingQueue<Object> objectQueue = new BufferedBlockingQueue<>(15, 15); | |
152 | LinkedList<Object> expectedValues = new LinkedList<>(); | |
153 | Random rnd = new Random(); | |
154 | rnd.setSeed(123); | |
155 | ||
156 | for (int i = 0; i < 10; i++) { | |
157 | /* | |
158 | * The queue's total size is 225 (15x15). We must make sure to not | |
159 | * fill it up here! | |
160 | */ | |
161 | for (int j = 0; j < 50; j++) { | |
0e4f957e AM |
162 | Integer testInt = rnd.nextInt(); |
163 | Long testLong = rnd.nextLong(); | |
164 | Double testDouble = rnd.nextDouble(); | |
165 | Double testGaussian = rnd.nextGaussian(); | |
9d979fda MK |
166 | |
167 | expectedValues.add(testInt); | |
168 | expectedValues.add(testLong); | |
169 | expectedValues.add(testDouble); | |
170 | expectedValues.add(testGaussian); | |
171 | objectQueue.put(testInt); | |
172 | objectQueue.put(testLong); | |
173 | objectQueue.put(testDouble); | |
174 | objectQueue.put(testGaussian); | |
175 | } | |
176 | objectQueue.flushInputBuffer(); | |
177 | ||
178 | while (!expectedValues.isEmpty()) { | |
179 | Object expected = expectedValues.removeFirst(); | |
180 | Object actual = objectQueue.take(); | |
181 | assertEquals(expected, actual); | |
182 | } | |
183 | } | |
184 | } | |
185 | ||
186 | /** | |
187 | * Read with a producer and a consumer | |
188 | * | |
189 | * @throws InterruptedException | |
190 | * The test was interrupted | |
191 | */ | |
192 | @Test | |
193 | public void testMultiThread() throws InterruptedException { | |
194 | /* A character not found in the test string */ | |
195 | final Character lastElement = '%'; | |
196 | ||
197 | Thread producer = new Thread() { | |
198 | @Override | |
199 | public void run() { | |
200 | for (char c : testString.toCharArray()) { | |
201 | charQueue.put(c); | |
202 | } | |
203 | charQueue.put(lastElement); | |
204 | charQueue.flushInputBuffer(); | |
205 | } | |
206 | }; | |
207 | producer.start(); | |
208 | ||
209 | Thread consumer = new Thread() { | |
210 | @Override | |
211 | public void run() { | |
212 | Character s = charQueue.take(); | |
213 | while (!s.equals(lastElement)) { | |
214 | s = charQueue.take(); | |
215 | } | |
216 | } | |
217 | }; | |
218 | consumer.start(); | |
219 | ||
220 | consumer.join(); | |
221 | producer.join(); | |
222 | } | |
223 | ||
49698f83 BH |
224 | /** |
225 | * Read with a producer and a consumer using | |
226 | * {@link BufferedBlockingQueue#blockingPeek()}. | |
227 | * | |
228 | * @throws InterruptedException | |
229 | * The test was interrupted | |
230 | */ | |
231 | @Test | |
232 | public void testBlockingPeek() throws InterruptedException { | |
233 | /* A character not found in the test string */ | |
234 | final Character lastElement = '%'; | |
235 | ||
236 | final StringBuilder sb = new StringBuilder(); | |
237 | ||
238 | Thread consumer = new Thread() { | |
239 | @Override | |
240 | public void run() { | |
241 | boolean isFinished = false; | |
242 | while (!isFinished) { | |
243 | // Read last element without removing it | |
244 | Character s = charQueue.blockingPeek(); | |
245 | isFinished = s.equals(lastElement); | |
246 | if (!isFinished) { | |
247 | sb.append(s); | |
248 | } | |
249 | // Remove element | |
250 | charQueue.take(); | |
251 | } | |
252 | } | |
253 | }; | |
254 | consumer.start(); | |
255 | ||
256 | Thread producer = new Thread() { | |
257 | @Override | |
258 | public void run() { | |
259 | for (char c : testString.toCharArray()) { | |
260 | charQueue.put(c); | |
261 | } | |
262 | charQueue.put(lastElement); | |
263 | charQueue.flushInputBuffer(); | |
264 | } | |
265 | }; | |
266 | producer.start(); | |
267 | ||
268 | producer.join(); | |
269 | consumer.join(); | |
270 | ||
271 | assertEquals(testString, sb.toString()); | |
272 | } | |
273 | ||
47c79d9f AM |
274 | /** |
275 | * Test the contents returned by {@link BufferedBlockingQueue#iterator()}. | |
276 | * | |
277 | * The test is sequential, because the iterator has no guarantee wrt to its | |
278 | * contents when run concurrently. | |
279 | */ | |
280 | @Test | |
281 | public void testIteratorContents() { | |
282 | Deque<Character> expected = new LinkedList<>(); | |
283 | ||
284 | /* Iterator should be empty initially */ | |
285 | assertFalse(charQueue.iterator().hasNext()); | |
286 | ||
287 | /* Insert the first 50 elements */ | |
288 | for (int i = 0; i < 50; i++) { | |
289 | char c = testString.charAt(i); | |
290 | charQueue.put(c); | |
291 | expected.addFirst(c); | |
292 | } | |
293 | LinkedList<Character> actual = new LinkedList<>(); | |
294 | Iterators.addAll(actual, charQueue.iterator()); | |
295 | assertSameElements(expected, actual); | |
296 | ||
297 | /* | |
298 | * Insert more elements, flush the input buffer (should not affect the | |
299 | * iteration). | |
300 | */ | |
301 | for (int i = 50; i < 60; i++) { | |
302 | char c = testString.charAt(i); | |
303 | charQueue.put(c); | |
304 | charQueue.flushInputBuffer(); | |
305 | expected.addFirst(c); | |
306 | } | |
307 | actual = new LinkedList<>(); | |
308 | Iterators.addAll(actual, charQueue.iterator()); | |
309 | assertSameElements(expected, actual); | |
310 | ||
311 | /* Consume the 30 last elements from the queue */ | |
312 | for (int i = 0; i < 30; i++) { | |
313 | charQueue.take(); | |
314 | expected.removeLast(); | |
315 | } | |
316 | actual = new LinkedList<>(); | |
317 | Iterators.addAll(actual, charQueue.iterator()); | |
318 | assertSameElements(expected, actual); | |
319 | ||
320 | /* Now empty the queue */ | |
321 | while (!charQueue.isEmpty()) { | |
322 | charQueue.take(); | |
323 | expected.removeLast(); | |
324 | } | |
325 | assertFalse(charQueue.iterator().hasNext()); | |
326 | } | |
327 | ||
328 | /** | |
329 | * Utility method to verify that two collections contain the exact same | |
330 | * elements, not necessarily in the same iteration order. | |
331 | * | |
332 | * {@link Collection#equals} requires the iteration order to be the same, | |
333 | * which we do not want here. | |
334 | * | |
335 | * Using a {@link Set} or {@link Collection#containsAll} is not sufficient | |
336 | * either, because those will throw away duplicate elements. | |
337 | */ | |
338 | private static <T> void assertSameElements(Collection<T> c1, Collection<T> c2) { | |
339 | assertEquals(HashMultiset.create(c1), HashMultiset.create(c2)); | |
340 | } | |
341 | ||
9d979fda | 342 | /** |
d6e2666b AM |
343 | * Test iterating on the queue while a producer and a consumer threads are |
344 | * using it. The iteration should not affect the elements taken by the | |
345 | * consumer. | |
9d979fda MK |
346 | */ |
347 | @Test | |
fadd7888 | 348 | public void testConcurrentIteration() { |
5b3fe39a | 349 | final BufferedBlockingQueue<@NonNull String> queue = new BufferedBlockingQueue<>(15, 15); |
fadd7888 | 350 | final String poisonPill = "That's all folks!"; |
9d979fda | 351 | |
fadd7888 AM |
352 | /* |
353 | * Convert the test's testBuffer into an array of String, one for each | |
bbadfd0a | 354 | * character. |
fadd7888 | 355 | */ |
bbadfd0a AM |
356 | List<String> strings = Chars.asList(testString.toCharArray()).stream() |
357 | .map(Object::toString) | |
358 | .collect(Collectors.toList()); | |
9d979fda | 359 | |
fadd7888 AM |
360 | Iterable<Iterable<String>> results = |
361 | runConcurrencyTest(queue, strings, poisonPill, 1, 1, 1); | |
362 | ||
363 | assertEquals(strings, Iterables.getOnlyElement(results)); | |
364 | } | |
9d979fda | 365 | |
fadd7888 AM |
366 | /** |
367 | * Run a concurrency test on a {@link BufferedBlockingQueue}, with the | |
368 | * specified number of producer, consumer and observer/iterator threads. | |
369 | * | |
370 | * The returned value represents the elements consumed by each consumer | |
371 | * thread. Thus, if there is one consumer, the top-level {@link Iterable} | |
372 | * will be of size 1, and the inner one should contain all the elements that | |
373 | * were inserted. | |
374 | * | |
375 | * @param queue | |
376 | * The queue to run the test on | |
377 | * @param testBuffer | |
378 | * The data set to insert in the queue. Every producer will | |
379 | * insert one entire set. | |
380 | * @param poisonPill | |
381 | * The "poison pill" to indicate the end. Simply make sure it is | |
382 | * a element of type <T> that is not present in the 'testBuffer'. | |
383 | * @param nbProducerThreads | |
384 | * Number of producer threads. There should be at least 1. | |
385 | * @param nbConsumerThreads | |
386 | * Number of consumer threads. There should be at least 1. | |
387 | * @param nbObserverThreads | |
388 | * Number of observer threads. It should be >= 0. | |
389 | * @return The consumed elements, as seen by each consumer thread. | |
390 | */ | |
5b3fe39a | 391 | private static <T> Iterable<Iterable<T>> runConcurrencyTest(final BufferedBlockingQueue<@NonNull T> queue, |
fadd7888 AM |
392 | final List<T> testBuffer, |
393 | final @NonNull T poisonPill, | |
394 | int nbProducerThreads, | |
395 | int nbConsumerThreads, | |
396 | int nbObserverThreads) { | |
397 | ||
398 | final class ProducerThread implements Runnable { | |
9d979fda MK |
399 | @Override |
400 | public void run() { | |
fadd7888 AM |
401 | for (int i = 0; i < testBuffer.size(); i++) { |
402 | T elem = testBuffer.get(i); | |
403 | if (elem == null) { | |
404 | // TODO replace with List<@NonNull T> once we can | |
405 | throw new IllegalArgumentException(); | |
406 | } | |
407 | queue.put(elem); | |
9d979fda | 408 | } |
d6e2666b AM |
409 | queue.put(poisonPill); |
410 | queue.flushInputBuffer(); | |
9d979fda | 411 | } |
fadd7888 | 412 | } |
9d979fda | 413 | |
fadd7888 AM |
414 | /** |
415 | * The consumer thread will return the elements it read via its Future. | |
416 | * | |
417 | * Note that if there are multiple consumers, there is no guarantee with | |
418 | * regards the contents of an individual one. | |
419 | */ | |
420 | final class ConsumerThread implements Callable<Iterable<T>> { | |
9d979fda | 421 | @Override |
fadd7888 AM |
422 | public Iterable<T> call() { |
423 | List<T> results = new LinkedList<>(); | |
424 | T elem = queue.take(); | |
425 | while (!elem.equals(poisonPill)) { | |
426 | results.add(elem); | |
427 | elem = queue.take(); | |
9d979fda | 428 | } |
fadd7888 | 429 | return results; |
9d979fda | 430 | } |
fadd7888 | 431 | } |
9d979fda | 432 | |
fadd7888 | 433 | final class ObserverThread implements Runnable { |
9d979fda MK |
434 | @Override |
435 | public void run() { | |
fadd7888 AM |
436 | for (int i = 0; i < 5; i++) { |
437 | final Set<T> results = new HashSet<>(); | |
438 | for (T input : queue) { | |
439 | /* | |
440 | * Do something with the element so that this iteration | |
441 | * does not get optimized out. | |
442 | */ | |
d6e2666b | 443 | results.add(input); |
9d979fda MK |
444 | } |
445 | } | |
9d979fda | 446 | } |
fadd7888 | 447 | } |
9d979fda | 448 | |
fadd7888 AM |
449 | if (nbProducerThreads < 1 || nbConsumerThreads < 1 || nbObserverThreads < 0) { |
450 | throw new IllegalArgumentException(); | |
451 | } | |
9d979fda | 452 | |
fadd7888 AM |
453 | final ExecutorService pool = Executors.newFixedThreadPool( |
454 | nbProducerThreads + nbConsumerThreads + nbObserverThreads); | |
9d979fda | 455 | |
fadd7888 AM |
456 | /* Consumed elements, per consumer thread */ |
457 | List<Future<Iterable<T>>> consumedElements = new LinkedList<>(); | |
9d979fda | 458 | |
fadd7888 AM |
459 | for (int i = 0; i < nbProducerThreads; i++) { |
460 | pool.submit(new ProducerThread()); | |
461 | } | |
462 | for (int i = 0; i < nbConsumerThreads; i++) { | |
463 | consumedElements.add(pool.submit(new ConsumerThread())); | |
464 | } | |
465 | for (int i = 0; i < nbObserverThreads; i++) { | |
466 | pool.submit(new ObserverThread()); | |
467 | } | |
468 | ||
469 | List<Iterable<T>> results = new LinkedList<>(); | |
470 | try { | |
471 | /* Convert the Future's to the actual return value */ | |
472 | for (Future<Iterable<T>> future : consumedElements) { | |
473 | Iterable<T> threadResult = future.get(); | |
474 | results.add(threadResult); | |
475 | } | |
476 | ||
477 | pool.shutdown(); | |
478 | boolean success = pool.awaitTermination(2, TimeUnit.MINUTES); | |
479 | if (!success) { | |
480 | throw new InterruptedException(); | |
481 | } | |
482 | ||
483 | } catch (ExecutionException | InterruptedException e) { | |
484 | fail(e.getMessage()); | |
485 | } | |
486 | ||
487 | return results; | |
488 | } | |
47c79d9f | 489 | |
fadd7888 | 490 | } |