Commit | Line | Data |
---|---|---|
6d758ba0 MK |
1 | /******************************************************************************* |
2 | * Copyright (c) 2015 Ericsson, EfficiOS Inc., and others | |
3 | * | |
4 | * All rights reserved. This program and the accompanying materials are | |
5 | * made available under the terms of the Eclipse Public License v1.0 which | |
6 | * accompanies this distribution, and is available at | |
7 | * http://www.eclipse.org/legal/epl-v10.html | |
8 | * | |
9 | * Contributors: | |
10 | * Matthew Khouzam - Initial API and implementation | |
11 | * Alexandre Montplaisir - Initial API and implementation | |
12 | *******************************************************************************/ | |
13 | ||
14 | package org.eclipse.tracecompass.common.core.collect; | |
15 | ||
16 | import static org.eclipse.tracecompass.common.core.NonNullUtils.checkNotNull; | |
17 | ||
18 | import java.util.Deque; | |
19 | import java.util.Iterator; | |
20 | import java.util.concurrent.ArrayBlockingQueue; | |
21 | import java.util.concurrent.BlockingQueue; | |
22 | import java.util.concurrent.ConcurrentLinkedDeque; | |
23 | import java.util.concurrent.locks.Lock; | |
24 | import java.util.concurrent.locks.ReentrantLock; | |
25 | ||
26 | import org.eclipse.tracecompass.internal.common.core.Activator; | |
27 | ||
28 | import com.google.common.collect.Iterables; | |
29 | import com.google.common.collect.Iterators; | |
30 | ||
31 | /** | |
32 | * A BufferedBlockingQueue is a wrapper around a {@link ArrayBlockingQueue}, | |
33 | * which provides input and output "buffers", so that chunks of elements are | |
34 | * inserted into the queue, rather than individual elements. | |
35 | * | |
36 | * The API provides usual put() and take() methods which work on single | |
37 | * elements. This class abstracts the concept of chunking, as well as the | |
38 | * required locking, from the users | |
39 | * | |
40 | * The main use case is for when different threads are doing insertion and | |
41 | * removal operations. The added buffering reduces the contention between those | |
42 | * two threads. | |
43 | * | |
44 | * @param <T> | |
45 | * The data type of the elements contained by the queue | |
46 | * @since 1.0 | |
47 | */ | |
48 | public class BufferedBlockingQueue<T> implements Iterable<T> { | |
49 | ||
50 | private final BlockingQueue<Deque<T>> fInnerQueue; | |
51 | private final Lock fInputLock = new ReentrantLock(); | |
52 | private final Lock fOutputLock = new ReentrantLock(); | |
53 | private final int fChunkSize; | |
54 | ||
55 | private Deque<T> fInputBuffer; | |
56 | private Deque<T> fOutputBuffer; | |
57 | ||
58 | /* | |
59 | * ConcurrentLinkedDeque's size() method does not run in constant time. | |
60 | * Since we know we will only increment it, keep track of the number of | |
61 | * insertions ourselves. It does not matter if the actual size is not exact. | |
62 | */ | |
63 | private int fInputBufferSize; | |
64 | ||
65 | /** | |
66 | * Constructor | |
67 | * | |
68 | * @param queueSize | |
69 | * The size of the actual blocking queue. This is the number of | |
70 | * *chunks* that will go in the queue. | |
71 | * @param chunkSize | |
72 | * The size of an individual chunk. | |
73 | */ | |
74 | public BufferedBlockingQueue(int queueSize, int chunkSize) { | |
75 | fInnerQueue = new ArrayBlockingQueue<>(queueSize); | |
76 | fChunkSize = chunkSize; | |
77 | ||
78 | fInputBuffer = new ConcurrentLinkedDeque<>(); | |
79 | /* | |
80 | * Set fOutputBuffer to something to avoid a null reference, even though | |
81 | * this particular object will never be used. | |
82 | */ | |
83 | fOutputBuffer = new ConcurrentLinkedDeque<>(); | |
84 | } | |
85 | ||
86 | /** | |
87 | * Put an element into the queue. | |
88 | * | |
89 | * This method will block the caller if the inner queue is full, waiting for | |
90 | * space to become available. | |
91 | * | |
92 | * @param element | |
93 | * The element to insert | |
94 | */ | |
95 | public void put(T element) { | |
96 | fInputLock.lock(); | |
97 | try { | |
98 | fInputBuffer.addFirst(element); | |
99 | fInputBufferSize++; | |
100 | if (fInputBufferSize >= fChunkSize) { | |
101 | this.flushInputBuffer(); | |
102 | } | |
103 | } finally { | |
104 | fInputLock.unlock(); | |
105 | } | |
106 | } | |
107 | ||
108 | /** | |
109 | * Flush the current input buffer, disregarding the expected buffer size | |
110 | * limit. | |
111 | * | |
112 | * This will guarantee that an element that was inserted via the | |
113 | * {@link #put} method becomes visible to the {@link #take} method. | |
114 | * | |
115 | * This method will block if the inner queue is currently full, waiting for | |
116 | * space to become available. | |
117 | */ | |
118 | public void flushInputBuffer() { | |
119 | fInputLock.lock(); | |
120 | try { | |
121 | /* | |
122 | * This call blocks if fInputBuffer is full, effectively blocking | |
123 | * the caller until elements are removed via the take() method. | |
124 | */ | |
125 | if (!fInputBuffer.isEmpty()) { | |
126 | fInnerQueue.put(fInputBuffer); | |
127 | fInputBuffer = new ConcurrentLinkedDeque<>(); | |
128 | fInputBufferSize = 0; | |
129 | } | |
130 | ||
131 | } catch (InterruptedException e) { | |
132 | Activator.instance().logError("Buffered queue interrupted", e); //$NON-NLS-1$ | |
133 | } finally { | |
134 | fInputLock.unlock(); | |
135 | } | |
136 | } | |
137 | ||
138 | /** | |
139 | * Retrieve an element from the queue. | |
140 | * | |
141 | * If the queue is empty, this call will block until an element is inserted. | |
142 | * | |
143 | * @return The retrieved element. It will be removed from the queue. | |
144 | */ | |
145 | public T take() { | |
146 | fOutputLock.lock(); | |
147 | try { | |
148 | if (fOutputBuffer.isEmpty()) { | |
149 | /* | |
150 | * Our read buffer is empty, take the next buffer in the queue. | |
151 | * This call will block if the inner queue is empty. | |
152 | */ | |
153 | fOutputBuffer = checkNotNull(fInnerQueue.take()); | |
154 | } | |
155 | return checkNotNull(fOutputBuffer.removeLast()); | |
156 | } catch (InterruptedException e) { | |
157 | Activator.instance().logError("Buffered queue interrupted", e); //$NON-NLS-1$ | |
158 | throw new IllegalStateException(); | |
159 | } finally { | |
160 | fOutputLock.unlock(); | |
161 | } | |
162 | } | |
163 | ||
164 | /** | |
165 | * Does the queue contain at least one element? | |
166 | * | |
167 | * @return if the queue is empty | |
168 | */ | |
169 | public boolean isEmpty() { | |
170 | /* | |
171 | * All three isEmpty()s are very fast, but we are hoping it | |
172 | * short-circuits on the first two since it would not make sense to have | |
173 | * an empty front and back and a full middle. | |
174 | */ | |
175 | return (fInputBuffer.isEmpty() && fOutputBuffer.isEmpty() && fInnerQueue.isEmpty()); | |
176 | } | |
177 | ||
178 | /** | |
179 | * Instantiate an iterator on the complete data structure. This includes the | |
180 | * inner queue as well as the input and output buffers. | |
181 | * | |
182 | * If concurrent insertions happen while the iterator is being used, it is | |
183 | * possible for an element that was actually in the queue when the call was | |
184 | * made to have been removed by the {@link #take} method in the meantime. | |
185 | * However, this iterator guarantees that each element is either inside the | |
186 | * queue OR was removed by the {@link #take} method. No element should | |
187 | * "fall in the cracks". | |
188 | * | |
189 | * @return An iterator over the whole buffered queue | |
190 | */ | |
191 | @Override | |
192 | public Iterator<T> iterator() { | |
193 | /* | |
194 | * Note that the iterators of ArrayBlockingQueue and | |
195 | * ConcurrentLinkedDeque are thread-safe, which allows iterating on them | |
196 | * while they are being modified without having to lock the accesses. | |
197 | * | |
198 | * To make sure we do not "miss" any elements, we need to look through | |
199 | * the input buffer first, then the inner queue, then the output buffer. | |
200 | */ | |
201 | ||
202 | Iterator<T> inputIterator = fInputBuffer.iterator(); | |
203 | Iterator<T> queueIterator = Iterables.concat(fInnerQueue).iterator(); | |
204 | Iterator<T> outputIterator = fOutputBuffer.iterator(); | |
205 | ||
206 | return checkNotNull(Iterators.concat(inputIterator, queueIterator, outputIterator)); | |
207 | } | |
208 | ||
209 | } |