gdbtrace: Move plugins to their own sub-directory
[deliverable/tracecompass.git] / org.eclipse.tracecompass.common.core / src / org / eclipse / tracecompass / common / core / collect / BufferedBlockingQueue.java
1 /*******************************************************************************
2 * Copyright (c) 2015 Ericsson, EfficiOS Inc., and others
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Matthew Khouzam - Initial API and implementation
11 * Alexandre Montplaisir - Initial API and implementation
12 *******************************************************************************/
13
14 package org.eclipse.tracecompass.common.core.collect;
15
16 import static org.eclipse.tracecompass.common.core.NonNullUtils.checkNotNull;
17
18 import java.util.Deque;
19 import java.util.Iterator;
20 import java.util.concurrent.ArrayBlockingQueue;
21 import java.util.concurrent.BlockingQueue;
22 import java.util.concurrent.ConcurrentLinkedDeque;
23 import java.util.concurrent.locks.Lock;
24 import java.util.concurrent.locks.ReentrantLock;
25
26 import org.eclipse.tracecompass.internal.common.core.Activator;
27
28 import com.google.common.collect.Iterables;
29 import com.google.common.collect.Iterators;
30
31 /**
32 * A BufferedBlockingQueue is a wrapper around a {@link ArrayBlockingQueue},
33 * which provides input and output "buffers", so that chunks of elements are
34 * inserted into the queue, rather than individual elements.
35 *
36 * The API provides usual put() and take() methods which work on single
37 * elements. This class abstracts the concept of chunking, as well as the
38 * required locking, from the users
39 *
40 * The main use case is for when different threads are doing insertion and
41 * removal operations. The added buffering reduces the contention between those
42 * two threads.
43 *
44 * @param <T>
45 * The data type of the elements contained by the queue
46 * @since 1.0
47 */
48 public class BufferedBlockingQueue<T> implements Iterable<T> {
49
50 private final BlockingQueue<Deque<T>> fInnerQueue;
51 private final Lock fInputLock = new ReentrantLock();
52 private final Lock fOutputLock = new ReentrantLock();
53 private final int fChunkSize;
54
55 private Deque<T> fInputBuffer;
56 private Deque<T> fOutputBuffer;
57
58 /*
59 * ConcurrentLinkedDeque's size() method does not run in constant time.
60 * Since we know we will only increment it, keep track of the number of
61 * insertions ourselves. It does not matter if the actual size is not exact.
62 */
63 private int fInputBufferSize;
64
65 /**
66 * Constructor
67 *
68 * @param queueSize
69 * The size of the actual blocking queue. This is the number of
70 * *chunks* that will go in the queue.
71 * @param chunkSize
72 * The size of an individual chunk.
73 */
74 public BufferedBlockingQueue(int queueSize, int chunkSize) {
75 fInnerQueue = new ArrayBlockingQueue<>(queueSize);
76 fChunkSize = chunkSize;
77
78 fInputBuffer = new ConcurrentLinkedDeque<>();
79 /*
80 * Set fOutputBuffer to something to avoid a null reference, even though
81 * this particular object will never be used.
82 */
83 fOutputBuffer = new ConcurrentLinkedDeque<>();
84 }
85
86 /**
87 * Put an element into the queue.
88 *
89 * This method will block the caller if the inner queue is full, waiting for
90 * space to become available.
91 *
92 * @param element
93 * The element to insert
94 */
95 public void put(T element) {
96 fInputLock.lock();
97 try {
98 fInputBuffer.addFirst(element);
99 fInputBufferSize++;
100 if (fInputBufferSize >= fChunkSize) {
101 this.flushInputBuffer();
102 }
103 } finally {
104 fInputLock.unlock();
105 }
106 }
107
108 /**
109 * Flush the current input buffer, disregarding the expected buffer size
110 * limit.
111 *
112 * This will guarantee that an element that was inserted via the
113 * {@link #put} method becomes visible to the {@link #take} method.
114 *
115 * This method will block if the inner queue is currently full, waiting for
116 * space to become available.
117 */
118 public void flushInputBuffer() {
119 fInputLock.lock();
120 try {
121 /*
122 * This call blocks if fInputBuffer is full, effectively blocking
123 * the caller until elements are removed via the take() method.
124 */
125 if (!fInputBuffer.isEmpty()) {
126 fInnerQueue.put(fInputBuffer);
127 fInputBuffer = new ConcurrentLinkedDeque<>();
128 fInputBufferSize = 0;
129 }
130
131 } catch (InterruptedException e) {
132 Activator.instance().logError("Buffered queue interrupted", e); //$NON-NLS-1$
133 } finally {
134 fInputLock.unlock();
135 }
136 }
137
138 /**
139 * Retrieve an element from the queue.
140 *
141 * If the queue is empty, this call will block until an element is inserted.
142 *
143 * @return The retrieved element. It will be removed from the queue.
144 */
145 public T take() {
146 fOutputLock.lock();
147 try {
148 if (fOutputBuffer.isEmpty()) {
149 /*
150 * Our read buffer is empty, take the next buffer in the queue.
151 * This call will block if the inner queue is empty.
152 */
153 fOutputBuffer = checkNotNull(fInnerQueue.take());
154 }
155 return checkNotNull(fOutputBuffer.removeLast());
156 } catch (InterruptedException e) {
157 Activator.instance().logError("Buffered queue interrupted", e); //$NON-NLS-1$
158 throw new IllegalStateException();
159 } finally {
160 fOutputLock.unlock();
161 }
162 }
163
164 /**
165 * Does the queue contain at least one element?
166 *
167 * @return if the queue is empty
168 */
169 public boolean isEmpty() {
170 /*
171 * All three isEmpty()s are very fast, but we are hoping it
172 * short-circuits on the first two since it would not make sense to have
173 * an empty front and back and a full middle.
174 */
175 return (fInputBuffer.isEmpty() && fOutputBuffer.isEmpty() && fInnerQueue.isEmpty());
176 }
177
178 /**
179 * Instantiate an iterator on the complete data structure. This includes the
180 * inner queue as well as the input and output buffers.
181 *
182 * If concurrent insertions happen while the iterator is being used, it is
183 * possible for an element that was actually in the queue when the call was
184 * made to have been removed by the {@link #take} method in the meantime.
185 * However, this iterator guarantees that each element is either inside the
186 * queue OR was removed by the {@link #take} method. No element should
187 * "fall in the cracks".
188 *
189 * @return An iterator over the whole buffered queue
190 */
191 @Override
192 public Iterator<T> iterator() {
193 /*
194 * Note that the iterators of ArrayBlockingQueue and
195 * ConcurrentLinkedDeque are thread-safe, which allows iterating on them
196 * while they are being modified without having to lock the accesses.
197 *
198 * To make sure we do not "miss" any elements, we need to look through
199 * the input buffer first, then the inner queue, then the output buffer.
200 */
201
202 Iterator<T> inputIterator = fInputBuffer.iterator();
203 Iterator<T> queueIterator = Iterables.concat(fInnerQueue).iterator();
204 Iterator<T> outputIterator = fOutputBuffer.iterator();
205
206 return checkNotNull(Iterators.concat(inputIterator, queueIterator, outputIterator));
207 }
208
209 }
This page took 0.035148 seconds and 5 git commands to generate.