Commit | Line | Data |
---|---|---|
6d758ba0 MK |
1 | /******************************************************************************* |
2 | * Copyright (c) 2015 Ericsson, EfficiOS Inc., and others | |
3 | * | |
4 | * All rights reserved. This program and the accompanying materials are | |
5 | * made available under the terms of the Eclipse Public License v1.0 which | |
6 | * accompanies this distribution, and is available at | |
7 | * http://www.eclipse.org/legal/epl-v10.html | |
8 | * | |
9 | * Contributors: | |
10 | * Matthew Khouzam - Initial API and implementation | |
11 | * Alexandre Montplaisir - Initial API and implementation | |
12 | *******************************************************************************/ | |
13 | ||
14 | package org.eclipse.tracecompass.common.core.collect; | |
15 | ||
16 | import static org.eclipse.tracecompass.common.core.NonNullUtils.checkNotNull; | |
17 | ||
18 | import java.util.Deque; | |
19 | import java.util.Iterator; | |
49698f83 BH |
20 | import java.util.NoSuchElementException; |
21 | import java.util.concurrent.BlockingDeque; | |
6d758ba0 MK |
22 | import java.util.concurrent.BlockingQueue; |
23 | import java.util.concurrent.ConcurrentLinkedDeque; | |
49698f83 BH |
24 | import java.util.concurrent.LinkedBlockingDeque; |
25 | import java.util.concurrent.atomic.AtomicInteger; | |
26 | import java.util.concurrent.locks.Condition; | |
6d758ba0 MK |
27 | import java.util.concurrent.locks.Lock; |
28 | import java.util.concurrent.locks.ReentrantLock; | |
29 | ||
49698f83 | 30 | import org.eclipse.jdt.annotation.Nullable; |
6d758ba0 MK |
31 | import org.eclipse.tracecompass.internal.common.core.Activator; |
32 | ||
6d758ba0 | 33 | /** |
49698f83 BH |
34 | * A BufferedBlockingQueue is a wrapper around a {@link BlockingQueue}, which |
35 | * provides input and output "buffers", so that chunks of elements are inserted | |
36 | * into the output buffer, rather than individual elements. | |
37 | * <p> | |
6d758ba0 MK |
38 | * The API provides usual put() and take() methods which work on single |
39 | * elements. This class abstracts the concept of chunking, as well as the | |
49698f83 BH |
40 | * required locking, from the users. |
41 | * <p> | |
6d758ba0 MK |
42 | * The main use case is for when different threads are doing insertion and |
43 | * removal operations. The added buffering reduces the contention between those | |
44 | * two threads. | |
45 | * | |
46 | * @param <T> | |
47 | * The data type of the elements contained by the queue | |
48 | * @since 1.0 | |
49 | */ | |
50 | public class BufferedBlockingQueue<T> implements Iterable<T> { | |
51 | ||
49698f83 | 52 | private final BlockingDeque<Deque<T>> fInnerQueue; |
6d758ba0 MK |
53 | private final Lock fInputLock = new ReentrantLock(); |
54 | private final Lock fOutputLock = new ReentrantLock(); | |
55 | private final int fChunkSize; | |
56 | ||
57 | private Deque<T> fInputBuffer; | |
58 | private Deque<T> fOutputBuffer; | |
59 | ||
60 | /* | |
61 | * ConcurrentLinkedDeque's size() method does not run in constant time. | |
62 | * Since we know we will only increment it, keep track of the number of | |
63 | * insertions ourselves. It does not matter if the actual size is not exact. | |
64 | */ | |
65 | private int fInputBufferSize; | |
66 | ||
49698f83 BH |
67 | private final AtomicInteger fSize = new AtomicInteger(0); |
68 | ||
69 | private final Condition fInnerQueueNotEmpty = checkNotNull(fOutputLock.newCondition()); | |
70 | ||
6d758ba0 MK |
71 | /** |
72 | * Constructor | |
73 | * | |
74 | * @param queueSize | |
75 | * The size of the actual blocking queue. This is the number of | |
76 | * *chunks* that will go in the queue. | |
77 | * @param chunkSize | |
78 | * The size of an individual chunk. | |
79 | */ | |
80 | public BufferedBlockingQueue(int queueSize, int chunkSize) { | |
49698f83 BH |
81 | /* Add one to the queue size for the output buffer */ |
82 | fInnerQueue = new LinkedBlockingDeque<>(queueSize + 1); | |
6d758ba0 MK |
83 | fChunkSize = chunkSize; |
84 | ||
85 | fInputBuffer = new ConcurrentLinkedDeque<>(); | |
86 | /* | |
49698f83 BH |
87 | * Create an empty output buffer to avoid a null reference, and add it |
88 | * to the queue. The output buffer is always the head of the queue. | |
6d758ba0 MK |
89 | */ |
90 | fOutputBuffer = new ConcurrentLinkedDeque<>(); | |
49698f83 | 91 | fInnerQueue.add(fOutputBuffer); |
6d758ba0 MK |
92 | } |
93 | ||
94 | /** | |
49698f83 BH |
95 | * Put an element at the tail of the queue. |
96 | * <p> | |
97 | * This method will block the caller if the output buffer is full, waiting | |
98 | * for space to become available. | |
6d758ba0 MK |
99 | * |
100 | * @param element | |
101 | * The element to insert | |
102 | */ | |
103 | public void put(T element) { | |
104 | fInputLock.lock(); | |
105 | try { | |
49698f83 BH |
106 | fInputBuffer.add(element); |
107 | fSize.incrementAndGet(); | |
6d758ba0 MK |
108 | fInputBufferSize++; |
109 | if (fInputBufferSize >= fChunkSize) { | |
110 | this.flushInputBuffer(); | |
111 | } | |
112 | } finally { | |
113 | fInputLock.unlock(); | |
114 | } | |
115 | } | |
116 | ||
117 | /** | |
118 | * Flush the current input buffer, disregarding the expected buffer size | |
119 | * limit. | |
49698f83 | 120 | * <p> |
6d758ba0 MK |
121 | * This will guarantee that an element that was inserted via the |
122 | * {@link #put} method becomes visible to the {@link #take} method. | |
123 | * | |
49698f83 BH |
124 | * This method will block if the output buffer is currently full, waiting |
125 | * for space to become available. | |
6d758ba0 MK |
126 | */ |
127 | public void flushInputBuffer() { | |
49698f83 | 128 | boolean signal = false; |
6d758ba0 MK |
129 | fInputLock.lock(); |
130 | try { | |
131 | /* | |
49698f83 | 132 | * This call blocks if the inner queue is full, effectively blocking |
6d758ba0 MK |
133 | * the caller until elements are removed via the take() method. |
134 | */ | |
135 | if (!fInputBuffer.isEmpty()) { | |
136 | fInnerQueue.put(fInputBuffer); | |
137 | fInputBuffer = new ConcurrentLinkedDeque<>(); | |
138 | fInputBufferSize = 0; | |
49698f83 | 139 | signal = true; |
6d758ba0 MK |
140 | } |
141 | ||
142 | } catch (InterruptedException e) { | |
143 | Activator.instance().logError("Buffered queue interrupted", e); //$NON-NLS-1$ | |
144 | } finally { | |
145 | fInputLock.unlock(); | |
146 | } | |
49698f83 BH |
147 | if (signal) { |
148 | fOutputLock.lock(); | |
149 | try { | |
150 | fInnerQueueNotEmpty.signalAll(); | |
151 | } finally { | |
152 | fOutputLock.unlock(); | |
153 | } | |
154 | } | |
6d758ba0 MK |
155 | } |
156 | ||
157 | /** | |
49698f83 BH |
158 | * Retrieve the head element from the queue. |
159 | * <p> | |
160 | * If the output buffer is empty, this call will block until an element is | |
161 | * inserted and fills the input buffer, or until the not-empty input buffer | |
162 | * is otherwise manually flushed. | |
6d758ba0 MK |
163 | * |
164 | * @return The retrieved element. It will be removed from the queue. | |
165 | */ | |
166 | public T take() { | |
167 | fOutputLock.lock(); | |
168 | try { | |
169 | if (fOutputBuffer.isEmpty()) { | |
170 | /* | |
49698f83 BH |
171 | * Our read buffer is empty, remove it from the queue and peek |
172 | * the next buffer in the queue. The loop will block if the | |
173 | * inner queue is empty, releasing the lock while it waits. | |
174 | */ | |
175 | fInnerQueue.remove(); | |
176 | while (fInnerQueue.isEmpty()) { | |
177 | fInnerQueueNotEmpty.await(); | |
178 | } | |
179 | fOutputBuffer = checkNotNull(fInnerQueue.peek()); | |
180 | } | |
181 | /* Our implementation guarantees this output buffer is not empty. */ | |
182 | T element = checkNotNull(fOutputBuffer.remove()); | |
183 | fSize.decrementAndGet(); | |
184 | return element; | |
185 | } catch (InterruptedException e) { | |
186 | Activator.instance().logError("Buffered queue interrupted", e); //$NON-NLS-1$ | |
187 | throw new IllegalStateException(); | |
188 | } finally { | |
189 | fOutputLock.unlock(); | |
190 | } | |
191 | } | |
192 | ||
193 | /** | |
194 | * Retrieve, but do not remove, the head element of this queue. | |
195 | * <p> | |
196 | * If the output buffer is empty, this call will block until an element is | |
197 | * inserted and fills the input buffer, or until the not-empty input buffer | |
198 | * is otherwise manually flushed. | |
199 | * | |
200 | * @return The head element of this queue, blocking until one is available | |
201 | * @since 1.1 | |
202 | */ | |
203 | public T blockingPeek() { | |
204 | fOutputLock.lock(); | |
205 | try { | |
206 | if (fOutputBuffer.isEmpty()) { | |
207 | /* | |
208 | * Our read buffer is empty, remove it from the queue and peek | |
209 | * the next buffer in the queue. The loop will block if the | |
210 | * inner queue is empty, releasing the lock while it waits. | |
6d758ba0 | 211 | */ |
49698f83 BH |
212 | fInnerQueue.remove(); |
213 | while (fInnerQueue.isEmpty()) { | |
214 | fInnerQueueNotEmpty.await(); | |
215 | } | |
216 | fOutputBuffer = checkNotNull(fInnerQueue.peek()); | |
6d758ba0 | 217 | } |
49698f83 BH |
218 | /* Our implementation guarantees this output buffer is not empty. */ |
219 | return checkNotNull(fOutputBuffer.peek()); | |
6d758ba0 MK |
220 | } catch (InterruptedException e) { |
221 | Activator.instance().logError("Buffered queue interrupted", e); //$NON-NLS-1$ | |
222 | throw new IllegalStateException(); | |
223 | } finally { | |
224 | fOutputLock.unlock(); | |
225 | } | |
226 | } | |
227 | ||
228 | /** | |
49698f83 | 229 | * Returns true if the queue size is 0. |
6d758ba0 | 230 | * |
49698f83 | 231 | * @return true if the queue is empty |
6d758ba0 MK |
232 | */ |
233 | public boolean isEmpty() { | |
49698f83 | 234 | return (fSize.get() == 0); |
6d758ba0 MK |
235 | } |
236 | ||
237 | /** | |
49698f83 | 238 | * Returns the number of elements in this queue. |
6d758ba0 | 239 | * |
49698f83 BH |
240 | * @return the number of elements in this queue |
241 | * @since 1.1 | |
242 | */ | |
243 | public int size() { | |
244 | return fSize.get(); | |
245 | } | |
246 | ||
247 | /** | |
248 | * Instantiate an iterator on the complete data structure. This includes the | |
249 | * input buffer as well as the output buffer. The elements will be returned | |
250 | * in order from last (tail) to first (head). | |
251 | * <p> | |
252 | * If concurrent removals happen while the iterator is being used, it is | |
6d758ba0 MK |
253 | * possible for an element that was actually in the queue when the call was |
254 | * made to have been removed by the {@link #take} method in the meantime. | |
255 | * However, this iterator guarantees that each element is either inside the | |
256 | * queue OR was removed by the {@link #take} method. No element should | |
257 | * "fall in the cracks". | |
49698f83 BH |
258 | * <p> |
259 | * The iterator itself is not safe to use concurrently by different threads. | |
260 | * <p> | |
261 | * The {@link Iterator#remove()} operation is not supported by this | |
262 | * iterator. | |
6d758ba0 | 263 | * |
49698f83 | 264 | * @return An iterator over the whole buffered queue in reverse sequence |
6d758ba0 MK |
265 | */ |
266 | @Override | |
267 | public Iterator<T> iterator() { | |
49698f83 BH |
268 | return new Itr(); |
269 | } | |
270 | ||
271 | private class Itr implements Iterator<T> { | |
6d758ba0 | 272 | /* |
49698f83 | 273 | * Note that the iterators of LinkedBlockingDeque and |
6d758ba0 MK |
274 | * ConcurrentLinkedDeque are thread-safe, which allows iterating on them |
275 | * while they are being modified without having to lock the accesses. | |
276 | * | |
277 | * To make sure we do not "miss" any elements, we need to look through | |
49698f83 BH |
278 | * the input buffer first, then the inner queue buffers in descending |
279 | * order, ending with the output buffer. | |
6d758ba0 | 280 | */ |
49698f83 BH |
281 | private @Nullable T fNext = null; |
282 | private Iterator<T> fBufferIterator; | |
283 | private final Iterator<Deque<T>> fQueueIterator; | |
284 | ||
285 | Itr() { | |
286 | fInputLock.lock(); | |
287 | try { | |
288 | fBufferIterator = checkNotNull(fInputBuffer.descendingIterator()); | |
289 | fQueueIterator = checkNotNull(fInnerQueue.descendingIterator()); | |
290 | } finally { | |
291 | fInputLock.unlock(); | |
292 | } | |
293 | } | |
294 | ||
295 | @Override | |
296 | public boolean hasNext() { | |
297 | if (fNext != null) { | |
298 | return true; | |
299 | } | |
300 | if (fBufferIterator.hasNext()) { | |
301 | fNext = fBufferIterator.next(); | |
302 | return true; | |
303 | } | |
304 | if (fQueueIterator.hasNext()) { | |
305 | fBufferIterator = checkNotNull(fQueueIterator.next().descendingIterator()); | |
306 | return hasNext(); | |
307 | } | |
308 | return false; | |
309 | } | |
6d758ba0 | 310 | |
49698f83 BH |
311 | @Override |
312 | public T next() { | |
313 | if (hasNext()) { | |
4c4e2816 | 314 | @Nullable T next = fNext; |
49698f83 BH |
315 | if (next != null) { |
316 | fNext = null; | |
317 | return next; | |
318 | } | |
319 | } | |
320 | throw new NoSuchElementException(); | |
321 | } | |
6d758ba0 | 322 | |
49698f83 BH |
323 | @Override |
324 | public void remove() { | |
325 | throw new UnsupportedOperationException(); | |
326 | } | |
6d758ba0 MK |
327 | } |
328 | ||
329 | } |