1 /*******************************************************************************
2 * Copyright (c) 2015 Ericsson, EfficiOS Inc., and others
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * Matthew Khouzam - Initial API and implementation
11 * Alexandre Montplaisir - Initial API and implementation
12 *******************************************************************************/
14 package org
.eclipse
.tracecompass
.common
.core
.collect
;
16 import static org
.eclipse
.tracecompass
.common
.core
.NonNullUtils
.checkNotNull
;
18 import java
.util
.Deque
;
19 import java
.util
.Iterator
;
20 import java
.util
.concurrent
.ArrayBlockingQueue
;
21 import java
.util
.concurrent
.BlockingQueue
;
22 import java
.util
.concurrent
.ConcurrentLinkedDeque
;
23 import java
.util
.concurrent
.locks
.Lock
;
24 import java
.util
.concurrent
.locks
.ReentrantLock
;
26 import org
.eclipse
.tracecompass
.internal
.common
.core
.Activator
;
28 import com
.google
.common
.collect
.Iterables
;
29 import com
.google
.common
.collect
.Iterators
;
32 * A BufferedBlockingQueue is a wrapper around a {@link ArrayBlockingQueue},
33 * which provides input and output "buffers", so that chunks of elements are
34 * inserted into the queue, rather than individual elements.
36 * The API provides usual put() and take() methods which work on single
37 * elements. This class abstracts the concept of chunking, as well as the
38 * required locking, from the users
40 * The main use case is for when different threads are doing insertion and
41 * removal operations. The added buffering reduces the contention between those
45 * The data type of the elements contained by the queue
48 public class BufferedBlockingQueue
<T
> implements Iterable
<T
> {
50 private final BlockingQueue
<Deque
<T
>> fInnerQueue
;
51 private final Lock fInputLock
= new ReentrantLock();
52 private final Lock fOutputLock
= new ReentrantLock();
53 private final int fChunkSize
;
55 private Deque
<T
> fInputBuffer
;
56 private Deque
<T
> fOutputBuffer
;
59 * ConcurrentLinkedDeque's size() method does not run in constant time.
60 * Since we know we will only increment it, keep track of the number of
61 * insertions ourselves. It does not matter if the actual size is not exact.
63 private int fInputBufferSize
;
69 * The size of the actual blocking queue. This is the number of
70 * *chunks* that will go in the queue.
72 * The size of an individual chunk.
74 public BufferedBlockingQueue(int queueSize
, int chunkSize
) {
75 fInnerQueue
= new ArrayBlockingQueue
<>(queueSize
);
76 fChunkSize
= chunkSize
;
78 fInputBuffer
= new ConcurrentLinkedDeque
<>();
80 * Set fOutputBuffer to something to avoid a null reference, even though
81 * this particular object will never be used.
83 fOutputBuffer
= new ConcurrentLinkedDeque
<>();
87 * Put an element into the queue.
89 * This method will block the caller if the inner queue is full, waiting for
90 * space to become available.
93 * The element to insert
95 public void put(T element
) {
98 fInputBuffer
.addFirst(element
);
100 if (fInputBufferSize
>= fChunkSize
) {
101 this.flushInputBuffer();
109 * Flush the current input buffer, disregarding the expected buffer size
112 * This will guarantee that an element that was inserted via the
113 * {@link #put} method becomes visible to the {@link #take} method.
115 * This method will block if the inner queue is currently full, waiting for
116 * space to become available.
118 public void flushInputBuffer() {
122 * This call blocks if fInputBuffer is full, effectively blocking
123 * the caller until elements are removed via the take() method.
125 if (!fInputBuffer
.isEmpty()) {
126 fInnerQueue
.put(fInputBuffer
);
127 fInputBuffer
= new ConcurrentLinkedDeque
<>();
128 fInputBufferSize
= 0;
131 } catch (InterruptedException e
) {
132 Activator
.instance().logError("Buffered queue interrupted", e
); //$NON-NLS-1$
139 * Retrieve an element from the queue.
141 * If the queue is empty, this call will block until an element is inserted.
143 * @return The retrieved element. It will be removed from the queue.
148 if (fOutputBuffer
.isEmpty()) {
150 * Our read buffer is empty, take the next buffer in the queue.
151 * This call will block if the inner queue is empty.
153 fOutputBuffer
= checkNotNull(fInnerQueue
.take());
155 return checkNotNull(fOutputBuffer
.removeLast());
156 } catch (InterruptedException e
) {
157 Activator
.instance().logError("Buffered queue interrupted", e
); //$NON-NLS-1$
158 throw new IllegalStateException();
160 fOutputLock
.unlock();
165 * Does the queue contain at least one element?
167 * @return if the queue is empty
169 public boolean isEmpty() {
171 * All three isEmpty()s are very fast, but we are hoping it
172 * short-circuits on the first two since it would not make sense to have
173 * an empty front and back and a full middle.
175 return (fInputBuffer
.isEmpty() && fOutputBuffer
.isEmpty() && fInnerQueue
.isEmpty());
179 * Instantiate an iterator on the complete data structure. This includes the
180 * inner queue as well as the input and output buffers.
182 * If concurrent insertions happen while the iterator is being used, it is
183 * possible for an element that was actually in the queue when the call was
184 * made to have been removed by the {@link #take} method in the meantime.
185 * However, this iterator guarantees that each element is either inside the
186 * queue OR was removed by the {@link #take} method. No element should
187 * "fall in the cracks".
189 * @return An iterator over the whole buffered queue
192 public Iterator
<T
> iterator() {
194 * Note that the iterators of ArrayBlockingQueue and
195 * ConcurrentLinkedDeque are thread-safe, which allows iterating on them
196 * while they are being modified without having to lock the accesses.
198 * To make sure we do not "miss" any elements, we need to look through
199 * the input buffer first, then the inner queue, then the output buffer.
202 Iterator
<T
> inputIterator
= fInputBuffer
.iterator();
203 Iterator
<T
> queueIterator
= Iterables
.concat(fInnerQueue
).iterator();
204 Iterator
<T
> outputIterator
= fOutputBuffer
.iterator();
206 return checkNotNull(Iterators
.concat(inputIterator
, queueIterator
, outputIterator
));
This page took 0.035148 seconds and 5 git commands to generate.