1 /*******************************************************************************
2 * Copyright (c) 2015 Ericsson, EfficiOS Inc., and others
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * Matthew Khouzam - Initial API and implementation
11 * Alexandre Montplaisir - Initial API and implementation
12 *******************************************************************************/
14 package org
.eclipse
.tracecompass
.common
.core
.collect
;
16 import static org
.eclipse
.tracecompass
.common
.core
.NonNullUtils
.checkNotNull
;
18 import java
.util
.Deque
;
19 import java
.util
.Iterator
;
20 import java
.util
.NoSuchElementException
;
21 import java
.util
.concurrent
.BlockingDeque
;
22 import java
.util
.concurrent
.BlockingQueue
;
23 import java
.util
.concurrent
.ConcurrentLinkedDeque
;
24 import java
.util
.concurrent
.LinkedBlockingDeque
;
25 import java
.util
.concurrent
.atomic
.AtomicInteger
;
26 import java
.util
.concurrent
.locks
.Condition
;
27 import java
.util
.concurrent
.locks
.Lock
;
28 import java
.util
.concurrent
.locks
.ReentrantLock
;
30 import org
.eclipse
.jdt
.annotation
.Nullable
;
31 import org
.eclipse
.tracecompass
.internal
.common
.core
.Activator
;
34 * A BufferedBlockingQueue is a wrapper around a {@link BlockingQueue}, which
35 * provides input and output "buffers", so that chunks of elements are inserted
36 * into the output buffer, rather than individual elements.
38 * The API provides usual put() and take() methods which work on single
39 * elements. This class abstracts the concept of chunking, as well as the
40 * required locking, from the users.
42 * The main use case is for when different threads are doing insertion and
43 * removal operations. The added buffering reduces the contention between those
47 * The data type of the elements contained by the queue
50 public class BufferedBlockingQueue
<T
> implements Iterable
<T
> {
52 private final BlockingDeque
<Deque
<T
>> fInnerQueue
;
53 private final Lock fInputLock
= new ReentrantLock();
54 private final Lock fOutputLock
= new ReentrantLock();
55 private final int fChunkSize
;
57 private Deque
<T
> fInputBuffer
;
58 private Deque
<T
> fOutputBuffer
;
61 * ConcurrentLinkedDeque's size() method does not run in constant time.
62 * Since we know we will only increment it, keep track of the number of
63 * insertions ourselves. It does not matter if the actual size is not exact.
65 private int fInputBufferSize
;
67 private final AtomicInteger fSize
= new AtomicInteger(0);
69 private final Condition fInnerQueueNotEmpty
= checkNotNull(fOutputLock
.newCondition());
75 * The size of the actual blocking queue. This is the number of
76 * *chunks* that will go in the queue.
78 * The size of an individual chunk.
80 public BufferedBlockingQueue(int queueSize
, int chunkSize
) {
81 /* Add one to the queue size for the output buffer */
82 fInnerQueue
= new LinkedBlockingDeque
<>(queueSize
+ 1);
83 fChunkSize
= chunkSize
;
85 fInputBuffer
= new ConcurrentLinkedDeque
<>();
87 * Create an empty output buffer to avoid a null reference, and add it
88 * to the queue. The output buffer is always the head of the queue.
90 fOutputBuffer
= new ConcurrentLinkedDeque
<>();
91 fInnerQueue
.add(fOutputBuffer
);
95 * Put an element at the tail of the queue.
97 * This method will block the caller if the output buffer is full, waiting
98 * for space to become available.
101 * The element to insert
103 public void put(T element
) {
106 fInputBuffer
.add(element
);
107 fSize
.incrementAndGet();
109 if (fInputBufferSize
>= fChunkSize
) {
110 this.flushInputBuffer();
118 * Flush the current input buffer, disregarding the expected buffer size
121 * This will guarantee that an element that was inserted via the
122 * {@link #put} method becomes visible to the {@link #take} method.
124 * This method will block if the output buffer is currently full, waiting
125 * for space to become available.
127 public void flushInputBuffer() {
128 boolean signal
= false;
132 * This call blocks if the inner queue is full, effectively blocking
133 * the caller until elements are removed via the take() method.
135 if (!fInputBuffer
.isEmpty()) {
136 fInnerQueue
.put(fInputBuffer
);
137 fInputBuffer
= new ConcurrentLinkedDeque
<>();
138 fInputBufferSize
= 0;
142 } catch (InterruptedException e
) {
143 Activator
.instance().logError("Buffered queue interrupted", e
); //$NON-NLS-1$
150 fInnerQueueNotEmpty
.signalAll();
152 fOutputLock
.unlock();
158 * Retrieve the head element from the queue.
160 * If the output buffer is empty, this call will block until an element is
161 * inserted and fills the input buffer, or until the not-empty input buffer
162 * is otherwise manually flushed.
164 * @return The retrieved element. It will be removed from the queue.
169 if (fOutputBuffer
.isEmpty()) {
171 * Our read buffer is empty, remove it from the queue and peek
172 * the next buffer in the queue. The loop will block if the
173 * inner queue is empty, releasing the lock while it waits.
175 fInnerQueue
.remove();
176 while (fInnerQueue
.isEmpty()) {
177 fInnerQueueNotEmpty
.await();
179 fOutputBuffer
= checkNotNull(fInnerQueue
.peek());
181 /* Our implementation guarantees this output buffer is not empty. */
182 T element
= checkNotNull(fOutputBuffer
.remove());
183 fSize
.decrementAndGet();
185 } catch (InterruptedException e
) {
186 Activator
.instance().logError("Buffered queue interrupted", e
); //$NON-NLS-1$
187 throw new IllegalStateException();
189 fOutputLock
.unlock();
194 * Retrieve, but do not remove, the head element of this queue.
196 * If the output buffer is empty, this call will block until an element is
197 * inserted and fills the input buffer, or until the not-empty input buffer
198 * is otherwise manually flushed.
200 * @return The head element of this queue, blocking until one is available
203 public T
blockingPeek() {
206 if (fOutputBuffer
.isEmpty()) {
208 * Our read buffer is empty, remove it from the queue and peek
209 * the next buffer in the queue. The loop will block if the
210 * inner queue is empty, releasing the lock while it waits.
212 fInnerQueue
.remove();
213 while (fInnerQueue
.isEmpty()) {
214 fInnerQueueNotEmpty
.await();
216 fOutputBuffer
= checkNotNull(fInnerQueue
.peek());
218 /* Our implementation guarantees this output buffer is not empty. */
219 return checkNotNull(fOutputBuffer
.peek());
220 } catch (InterruptedException e
) {
221 Activator
.instance().logError("Buffered queue interrupted", e
); //$NON-NLS-1$
222 throw new IllegalStateException();
224 fOutputLock
.unlock();
229 * Returns true if the queue size is 0.
231 * @return true if the queue is empty
233 public boolean isEmpty() {
234 return (fSize
.get() == 0);
238 * Returns the number of elements in this queue.
240 * @return the number of elements in this queue
248 * Instantiate an iterator on the complete data structure. This includes the
249 * input buffer as well as the output buffer. The elements will be returned
250 * in order from last (tail) to first (head).
252 * If concurrent removals happen while the iterator is being used, it is
253 * possible for an element that was actually in the queue when the call was
254 * made to have been removed by the {@link #take} method in the meantime.
255 * However, this iterator guarantees that each element is either inside the
256 * queue OR was removed by the {@link #take} method. No element should
257 * "fall in the cracks".
259 * The iterator itself is not safe to use concurrently by different threads.
261 * The {@link Iterator#remove()} operation is not supported by this
264 * @return An iterator over the whole buffered queue in reverse sequence
267 public Iterator
<T
> iterator() {
271 private class Itr
implements Iterator
<T
> {
273 * Note that the iterators of LinkedBlockingDeque and
274 * ConcurrentLinkedDeque are thread-safe, which allows iterating on them
275 * while they are being modified without having to lock the accesses.
277 * To make sure we do not "miss" any elements, we need to look through
278 * the input buffer first, then the inner queue buffers in descending
279 * order, ending with the output buffer.
281 private @Nullable T fNext
= null;
282 private Iterator
<T
> fBufferIterator
;
283 private final Iterator
<Deque
<T
>> fQueueIterator
;
288 fBufferIterator
= checkNotNull(fInputBuffer
.descendingIterator());
289 fQueueIterator
= checkNotNull(fInnerQueue
.descendingIterator());
296 public boolean hasNext() {
300 if (fBufferIterator
.hasNext()) {
301 fNext
= fBufferIterator
.next();
304 if (fQueueIterator
.hasNext()) {
305 fBufferIterator
= checkNotNull(fQueueIterator
.next().descendingIterator());
314 @Nullable T next
= fNext
;
320 throw new NoSuchElementException();
324 public void remove() {
325 throw new UnsupportedOperationException();