releng: Transition to jdt.annotation 2.0
[deliverable/tracecompass.git] / common / org.eclipse.tracecompass.common.core / src / org / eclipse / tracecompass / common / core / collect / BufferedBlockingQueue.java
1 /*******************************************************************************
2 * Copyright (c) 2015 Ericsson, EfficiOS Inc., and others
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Matthew Khouzam - Initial API and implementation
11 * Alexandre Montplaisir - Initial API and implementation
12 *******************************************************************************/
13
14 package org.eclipse.tracecompass.common.core.collect;
15
16 import static org.eclipse.tracecompass.common.core.NonNullUtils.checkNotNull;
17
18 import java.util.Deque;
19 import java.util.Iterator;
20 import java.util.NoSuchElementException;
21 import java.util.concurrent.BlockingDeque;
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.ConcurrentLinkedDeque;
24 import java.util.concurrent.LinkedBlockingDeque;
25 import java.util.concurrent.atomic.AtomicInteger;
26 import java.util.concurrent.locks.Condition;
27 import java.util.concurrent.locks.Lock;
28 import java.util.concurrent.locks.ReentrantLock;
29
30 import org.eclipse.jdt.annotation.Nullable;
31 import org.eclipse.tracecompass.internal.common.core.Activator;
32
33 /**
34 * A BufferedBlockingQueue is a wrapper around a {@link BlockingQueue}, which
35 * provides input and output "buffers", so that chunks of elements are inserted
36 * into the output buffer, rather than individual elements.
37 * <p>
38 * The API provides usual put() and take() methods which work on single
39 * elements. This class abstracts the concept of chunking, as well as the
40 * required locking, from the users.
41 * <p>
42 * The main use case is for when different threads are doing insertion and
43 * removal operations. The added buffering reduces the contention between those
44 * two threads.
45 *
46 * @param <T>
47 * The data type of the elements contained by the queue
48 * @since 1.0
49 */
50 public class BufferedBlockingQueue<T> implements Iterable<T> {
51
52 private final BlockingDeque<Deque<T>> fInnerQueue;
53 private final Lock fInputLock = new ReentrantLock();
54 private final Lock fOutputLock = new ReentrantLock();
55 private final int fChunkSize;
56
57 private Deque<T> fInputBuffer;
58 private Deque<T> fOutputBuffer;
59
60 /*
61 * ConcurrentLinkedDeque's size() method does not run in constant time.
62 * Since we know we will only increment it, keep track of the number of
63 * insertions ourselves. It does not matter if the actual size is not exact.
64 */
65 private int fInputBufferSize;
66
67 private final AtomicInteger fSize = new AtomicInteger(0);
68
69 private final Condition fInnerQueueNotEmpty = checkNotNull(fOutputLock.newCondition());
70
71 /**
72 * Constructor
73 *
74 * @param queueSize
75 * The size of the actual blocking queue. This is the number of
76 * *chunks* that will go in the queue.
77 * @param chunkSize
78 * The size of an individual chunk.
79 */
80 public BufferedBlockingQueue(int queueSize, int chunkSize) {
81 /* Add one to the queue size for the output buffer */
82 fInnerQueue = new LinkedBlockingDeque<>(queueSize + 1);
83 fChunkSize = chunkSize;
84
85 fInputBuffer = new ConcurrentLinkedDeque<>();
86 /*
87 * Create an empty output buffer to avoid a null reference, and add it
88 * to the queue. The output buffer is always the head of the queue.
89 */
90 fOutputBuffer = new ConcurrentLinkedDeque<>();
91 fInnerQueue.add(fOutputBuffer);
92 }
93
94 /**
95 * Put an element at the tail of the queue.
96 * <p>
97 * This method will block the caller if the output buffer is full, waiting
98 * for space to become available.
99 *
100 * @param element
101 * The element to insert
102 */
103 public void put(T element) {
104 fInputLock.lock();
105 try {
106 fInputBuffer.add(element);
107 fSize.incrementAndGet();
108 fInputBufferSize++;
109 if (fInputBufferSize >= fChunkSize) {
110 this.flushInputBuffer();
111 }
112 } finally {
113 fInputLock.unlock();
114 }
115 }
116
117 /**
118 * Flush the current input buffer, disregarding the expected buffer size
119 * limit.
120 * <p>
121 * This will guarantee that an element that was inserted via the
122 * {@link #put} method becomes visible to the {@link #take} method.
123 *
124 * This method will block if the output buffer is currently full, waiting
125 * for space to become available.
126 */
127 public void flushInputBuffer() {
128 boolean signal = false;
129 fInputLock.lock();
130 try {
131 /*
132 * This call blocks if the inner queue is full, effectively blocking
133 * the caller until elements are removed via the take() method.
134 */
135 if (!fInputBuffer.isEmpty()) {
136 fInnerQueue.put(fInputBuffer);
137 fInputBuffer = new ConcurrentLinkedDeque<>();
138 fInputBufferSize = 0;
139 signal = true;
140 }
141
142 } catch (InterruptedException e) {
143 Activator.instance().logError("Buffered queue interrupted", e); //$NON-NLS-1$
144 } finally {
145 fInputLock.unlock();
146 }
147 if (signal) {
148 fOutputLock.lock();
149 try {
150 fInnerQueueNotEmpty.signalAll();
151 } finally {
152 fOutputLock.unlock();
153 }
154 }
155 }
156
157 /**
158 * Retrieve the head element from the queue.
159 * <p>
160 * If the output buffer is empty, this call will block until an element is
161 * inserted and fills the input buffer, or until the not-empty input buffer
162 * is otherwise manually flushed.
163 *
164 * @return The retrieved element. It will be removed from the queue.
165 */
166 public T take() {
167 fOutputLock.lock();
168 try {
169 if (fOutputBuffer.isEmpty()) {
170 /*
171 * Our read buffer is empty, remove it from the queue and peek
172 * the next buffer in the queue. The loop will block if the
173 * inner queue is empty, releasing the lock while it waits.
174 */
175 fInnerQueue.remove();
176 while (fInnerQueue.isEmpty()) {
177 fInnerQueueNotEmpty.await();
178 }
179 fOutputBuffer = checkNotNull(fInnerQueue.peek());
180 }
181 /* Our implementation guarantees this output buffer is not empty. */
182 T element = checkNotNull(fOutputBuffer.remove());
183 fSize.decrementAndGet();
184 return element;
185 } catch (InterruptedException e) {
186 Activator.instance().logError("Buffered queue interrupted", e); //$NON-NLS-1$
187 throw new IllegalStateException();
188 } finally {
189 fOutputLock.unlock();
190 }
191 }
192
193 /**
194 * Retrieve, but do not remove, the head element of this queue.
195 * <p>
196 * If the output buffer is empty, this call will block until an element is
197 * inserted and fills the input buffer, or until the not-empty input buffer
198 * is otherwise manually flushed.
199 *
200 * @return The head element of this queue, blocking until one is available
201 * @since 1.1
202 */
203 public T blockingPeek() {
204 fOutputLock.lock();
205 try {
206 if (fOutputBuffer.isEmpty()) {
207 /*
208 * Our read buffer is empty, remove it from the queue and peek
209 * the next buffer in the queue. The loop will block if the
210 * inner queue is empty, releasing the lock while it waits.
211 */
212 fInnerQueue.remove();
213 while (fInnerQueue.isEmpty()) {
214 fInnerQueueNotEmpty.await();
215 }
216 fOutputBuffer = checkNotNull(fInnerQueue.peek());
217 }
218 /* Our implementation guarantees this output buffer is not empty. */
219 return checkNotNull(fOutputBuffer.peek());
220 } catch (InterruptedException e) {
221 Activator.instance().logError("Buffered queue interrupted", e); //$NON-NLS-1$
222 throw new IllegalStateException();
223 } finally {
224 fOutputLock.unlock();
225 }
226 }
227
228 /**
229 * Returns true if the queue size is 0.
230 *
231 * @return true if the queue is empty
232 */
233 public boolean isEmpty() {
234 return (fSize.get() == 0);
235 }
236
237 /**
238 * Returns the number of elements in this queue.
239 *
240 * @return the number of elements in this queue
241 * @since 1.1
242 */
243 public int size() {
244 return fSize.get();
245 }
246
247 /**
248 * Instantiate an iterator on the complete data structure. This includes the
249 * input buffer as well as the output buffer. The elements will be returned
250 * in order from last (tail) to first (head).
251 * <p>
252 * If concurrent removals happen while the iterator is being used, it is
253 * possible for an element that was actually in the queue when the call was
254 * made to have been removed by the {@link #take} method in the meantime.
255 * However, this iterator guarantees that each element is either inside the
256 * queue OR was removed by the {@link #take} method. No element should
257 * "fall in the cracks".
258 * <p>
259 * The iterator itself is not safe to use concurrently by different threads.
260 * <p>
261 * The {@link Iterator#remove()} operation is not supported by this
262 * iterator.
263 *
264 * @return An iterator over the whole buffered queue in reverse sequence
265 */
266 @Override
267 public Iterator<T> iterator() {
268 return new Itr();
269 }
270
271 private class Itr implements Iterator<T> {
272 /*
273 * Note that the iterators of LinkedBlockingDeque and
274 * ConcurrentLinkedDeque are thread-safe, which allows iterating on them
275 * while they are being modified without having to lock the accesses.
276 *
277 * To make sure we do not "miss" any elements, we need to look through
278 * the input buffer first, then the inner queue buffers in descending
279 * order, ending with the output buffer.
280 */
281 private @Nullable T fNext = null;
282 private Iterator<T> fBufferIterator;
283 private final Iterator<Deque<T>> fQueueIterator;
284
285 Itr() {
286 fInputLock.lock();
287 try {
288 fBufferIterator = checkNotNull(fInputBuffer.descendingIterator());
289 fQueueIterator = checkNotNull(fInnerQueue.descendingIterator());
290 } finally {
291 fInputLock.unlock();
292 }
293 }
294
295 @Override
296 public boolean hasNext() {
297 if (fNext != null) {
298 return true;
299 }
300 if (fBufferIterator.hasNext()) {
301 fNext = fBufferIterator.next();
302 return true;
303 }
304 if (fQueueIterator.hasNext()) {
305 fBufferIterator = checkNotNull(fQueueIterator.next().descendingIterator());
306 return hasNext();
307 }
308 return false;
309 }
310
311 @Override
312 public T next() {
313 if (hasNext()) {
314 @Nullable T next = fNext;
315 if (next != null) {
316 fNext = null;
317 return next;
318 }
319 }
320 throw new NoSuchElementException();
321 }
322
323 @Override
324 public void remove() {
325 throw new UnsupportedOperationException();
326 }
327 }
328
329 }
This page took 0.037653 seconds and 5 git commands to generate.