7d24f5063a397f9450bd2716bfdd08512ff356ed
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf / src / org / eclipse / linuxtools / tmf / component / TmfDataProvider.java
1 /*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
12
13 package org.eclipse.linuxtools.tmf.component;
14
15 import java.util.Vector;
16 import java.util.concurrent.BlockingQueue;
17 import java.util.concurrent.LinkedBlockingQueue;
18 import java.util.concurrent.SynchronousQueue;
19
20 import org.eclipse.linuxtools.tmf.Tracer;
21 import org.eclipse.linuxtools.tmf.event.TmfData;
22 import org.eclipse.linuxtools.tmf.request.ITmfDataRequest;
23 import org.eclipse.linuxtools.tmf.request.ITmfDataRequest.ExecutionType;
24 import org.eclipse.linuxtools.tmf.request.TmfCoalescedDataRequest;
25 import org.eclipse.linuxtools.tmf.request.TmfDataRequest;
26 import org.eclipse.linuxtools.tmf.request.TmfRequestExecutor;
27 import org.eclipse.linuxtools.tmf.signal.TmfEndSynchSignal;
28 import org.eclipse.linuxtools.tmf.signal.TmfSignalHandler;
29 import org.eclipse.linuxtools.tmf.signal.TmfStartSynchSignal;
30 import org.eclipse.linuxtools.tmf.trace.ITmfContext;
31
32 /**
33 * <b><u>TmfProvider</u></b>
34 * <p>
35 * The TmfProvider<T> is a provider for a data of type <T>.
36 * <p>
37 * This abstract class implements the housekeeking methods to register/
38 * deregister the event provider and to handle generically the event requests.
39 * <p>
40 * The concrete class can either re-implement processRequest() entirely or
41 * just implement the hooks (initializeContext() and getNext()).
42 * <p>
43 * TODO: Add support for providing multiple data types.
44 */
45 public abstract class TmfDataProvider<T extends TmfData> extends TmfComponent implements ITmfDataProvider<T> {
46
47 // ------------------------------------------------------------------------
48 // Constants
49 // ------------------------------------------------------------------------
50
51 // private static final ITmfDataRequest.ExecutionType SHORT = ITmfDataRequest.ExecutionType.SHORT;
52 // private static final ITmfDataRequest.ExecutionType LONG = ITmfDataRequest.ExecutionType.LONG;
53
54 // ------------------------------------------------------------------------
55 //
56 // ------------------------------------------------------------------------
57
58 final protected Class<T> fType;
59 final protected boolean fLogData;
60 final protected boolean fLogError;
61
62 public static final int DEFAULT_BLOCK_SIZE = 50000;
63 public static final int DEFAULT_QUEUE_SIZE = 1000;
64
65 protected final int fQueueSize;
66 protected final BlockingQueue<T> fDataQueue;
67 protected final TmfRequestExecutor fExecutor;
68
69 private int fSignalDepth = 0;
70 private final Object fLock = new Object();
71
72 private int fRequestPendingCounter = 0;
73
74 // ------------------------------------------------------------------------
75 // Constructors
76 // ------------------------------------------------------------------------
77
78 public TmfDataProvider(String name, Class<T> type) {
79 this(name, type, DEFAULT_QUEUE_SIZE);
80 }
81
82 protected TmfDataProvider(String name, Class<T> type, int queueSize) {
83 super(name);
84 fType = type;
85 fQueueSize = queueSize;
86 fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
87
88 fExecutor = new TmfRequestExecutor();
89 fSignalDepth = 0;
90
91 fLogData = Tracer.isEventTraced();
92 fLogError = Tracer.isErrorTraced();
93
94 TmfProviderManager.register(fType, this);
95 // if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "started");
96 }
97
98 public TmfDataProvider(TmfDataProvider<T> other) {
99 super(other);
100 fType = other.fType;
101 fQueueSize = other.fQueueSize;
102 fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
103
104 fExecutor = new TmfRequestExecutor();
105 fSignalDepth = 0;
106
107 fLogData = Tracer.isEventTraced();
108 fLogError = Tracer.isErrorTraced();
109 }
110
111 @Override
112 public void dispose() {
113 TmfProviderManager.deregister(fType, this);
114 fExecutor.stop();
115 super.dispose();
116 // if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "stopped");
117 }
118
119 public int getQueueSize() {
120 return fQueueSize;
121 }
122
123 public Class<?> getType() {
124 return fType;
125 }
126
127 // ------------------------------------------------------------------------
128 // ITmfRequestHandler
129 // ------------------------------------------------------------------------
130
131 @Override
132 public void sendRequest(final ITmfDataRequest<T> request) {
133 synchronized(fLock) {
134 if (fSignalDepth > 0) {
135 coalesceDataRequest(request);
136 } else {
137 dispatchRequest(request);
138 }
139 }
140 }
141
142 /**
143 * This method queues the coalesced requests.
144 *
145 * @param thread
146 */
147 @Override
148 public void fireRequest() {
149 synchronized(fLock) {
150 if (fRequestPendingCounter > 0) {
151 return;
152 }
153 if (fPendingCoalescedRequests.size() > 0) {
154 for (TmfDataRequest<T> request : fPendingCoalescedRequests) {
155 dispatchRequest(request);
156 }
157 fPendingCoalescedRequests.clear();
158 }
159 }
160 }
161
162 /**
163 * Increments/decrements the pending requests counters and fires
164 * the request if necessary (counter == 0). Used for coalescing
165 * requests accross multiple TmfDataProvider.
166 *
167 * @param isIncrement
168 */
169 @Override
170 public void notifyPendingRequest(boolean isIncrement) {
171 synchronized(fLock) {
172 if (isIncrement) {
173 if (fSignalDepth > 0) {
174 fRequestPendingCounter++;
175 }
176 } else {
177 if (fRequestPendingCounter > 0) {
178 fRequestPendingCounter--;
179 }
180
181 // fire request if all pending requests are received
182 if (fRequestPendingCounter == 0) {
183 fireRequest();
184 }
185 }
186 }
187 }
188
189 // ------------------------------------------------------------------------
190 // Coalescing (primitive test...)
191 // ------------------------------------------------------------------------
192
193 protected Vector<TmfCoalescedDataRequest<T>> fPendingCoalescedRequests = new Vector<TmfCoalescedDataRequest<T>>();
194
195 protected void newCoalescedDataRequest(ITmfDataRequest<T> request) {
196 synchronized(fLock) {
197 TmfCoalescedDataRequest<T> coalescedRequest = new TmfCoalescedDataRequest<T>(
198 fType, request.getIndex(), request.getNbRequested(),request.getExecType());
199 coalescedRequest.addRequest(request);
200 if (Tracer.isRequestTraced()) {
201 Tracer.traceRequest(request, "coalesced with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
202 Tracer.traceRequest(coalescedRequest, "added " + request.getRequestId()); //$NON-NLS-1$
203 }
204 fPendingCoalescedRequests.add(coalescedRequest);
205 }
206 }
207
208 protected void coalesceDataRequest(ITmfDataRequest<T> request) {
209 synchronized(fLock) {
210 for (TmfCoalescedDataRequest<T> req : fPendingCoalescedRequests) {
211 if (req.isCompatible(request)) {
212 req.addRequest(request);
213 if (Tracer.isRequestTraced()) {
214 Tracer.traceRequest(request, "coalesced with " + req.getRequestId()); //$NON-NLS-1$
215 Tracer.traceRequest(req, "added " + request.getRequestId()); //$NON-NLS-1$
216 }
217 return;
218 }
219 }
220 newCoalescedDataRequest(request);
221 }
222 }
223
224 // ------------------------------------------------------------------------
225 // Request processing
226 // ------------------------------------------------------------------------
227
228 private void dispatchRequest(final ITmfDataRequest<T> request) {
229 if (request.getExecType() == ExecutionType.FOREGROUND)
230 queueRequest(request);
231 else
232 queueBackgroundRequest(request, DEFAULT_BLOCK_SIZE, true);
233 }
234
235 protected void queueRequest(final ITmfDataRequest<T> request) {
236
237 if (fExecutor.isShutdown()) {
238 request.cancel();
239 return;
240 }
241
242 final TmfDataProvider<T> provider = this;
243
244 // Process the request
245 TmfThread thread = new TmfThread(request.getExecType()) {
246
247 @Override
248 public void run() {
249
250 if (Tracer.isRequestTraced()) Tracer.traceRequest(request, "started"); //$NON-NLS-1$
251
252 // Extract the generic information
253 request.start();
254 int nbRequested = request.getNbRequested();
255 int nbRead = 0;
256
257 // Initialize the execution
258 ITmfContext context = armRequest(request);
259 if (context == null) {
260 request.cancel();
261 return;
262 }
263
264 try {
265 // Get the ordered events
266 if (Tracer.isRequestTraced()) Tracer.trace("Request #" + request.getRequestId() + " is being serviced by " + provider.getName()); //$NON-NLS-1$//$NON-NLS-2$
267 T data = getNext(context);
268 if (Tracer.isRequestTraced()) Tracer.trace("Request #" + request.getRequestId() + " read first event"); //$NON-NLS-1$ //$NON-NLS-2$
269 while (data != null && !isCompleted(request, data, nbRead))
270 {
271 if (fLogData) Tracer.traceEvent(provider, request, data);
272 request.handleData(data);
273
274 // To avoid an unnecessary read passed the last data requested
275 if (++nbRead < nbRequested) {
276 data = getNext(context);
277 if (Tracer.isRequestTraced() && (data == null || data.isNullRef())) {
278 Tracer.trace("Request #" + request.getRequestId() + " end of data"); //$NON-NLS-1$//$NON-NLS-2$
279 }
280 }
281 }
282
283 if (request.isCancelled()) {
284 request.cancel();
285 }
286 else {
287 request.done();
288 }
289
290 if (Tracer.isRequestTraced()) Tracer.traceRequest(request, "completed"); //$NON-NLS-1$
291 }
292 catch (Exception e) {
293 if (Tracer.isRequestTraced()) Tracer.traceRequest(request, "exception (failed)"); //$NON-NLS-1$
294 request.fail();
295 }
296 }
297 };
298
299 fExecutor.execute(thread);
300
301 if (Tracer.isRequestTraced()) Tracer.traceRequest(request, "queued"); //$NON-NLS-1$
302 }
303
304 // By default, same behavior as a foreground request
305 protected void queueBackgroundRequest(final ITmfDataRequest<T> request, final int blockSize, boolean indexing) {
306 queueRequest(request);
307 }
308
309 /**
310 * Initialize the provider based on the request. The context is
311 * provider specific and will be updated by getNext().
312 *
313 * @param request
314 * @return an application specific context; null if request can't be serviced
315 */
316 public abstract ITmfContext armRequest(ITmfDataRequest<T> request);
317 public abstract T getNext(ITmfContext context);
318
319 /**
320 * Checks if the data meets the request completion criteria.
321 *
322 * @param request
323 * @param data
324 * @return
325 */
326 public boolean isCompleted(ITmfDataRequest<T> request, T data, int nbRead) {
327 return request.isCompleted() || nbRead >= request.getNbRequested() || data.isNullRef();
328 }
329
330 // ------------------------------------------------------------------------
331 // Signal handlers
332 // ------------------------------------------------------------------------
333
334 @TmfSignalHandler
335 public void startSynch(TmfStartSynchSignal signal) {
336 synchronized (fLock) {
337 fSignalDepth++;
338 }
339 }
340
341 @TmfSignalHandler
342 public void endSynch(TmfEndSynchSignal signal) {
343 synchronized (fLock) {
344 fSignalDepth--;
345 if (fSignalDepth == 0) {
346 fireRequest();
347 }
348 }
349 }
350
351 }
This page took 0.054885 seconds and 4 git commands to generate.