Contribution for Bug353020
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf / src / org / eclipse / linuxtools / tmf / component / TmfDataProvider.java
1 /*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
12
13 package org.eclipse.linuxtools.tmf.component;
14
15 import java.util.Vector;
16 import java.util.concurrent.BlockingQueue;
17 import java.util.concurrent.LinkedBlockingQueue;
18 import java.util.concurrent.SynchronousQueue;
19
20 import org.eclipse.linuxtools.tmf.Tracer;
21 import org.eclipse.linuxtools.tmf.event.TmfData;
22 import org.eclipse.linuxtools.tmf.request.ITmfDataRequest;
23 import org.eclipse.linuxtools.tmf.request.ITmfDataRequest.ExecutionType;
24 import org.eclipse.linuxtools.tmf.request.TmfCoalescedDataRequest;
25 import org.eclipse.linuxtools.tmf.request.TmfDataRequest;
26 import org.eclipse.linuxtools.tmf.request.TmfRequestExecutor;
27 import org.eclipse.linuxtools.tmf.signal.TmfEndSynchSignal;
28 import org.eclipse.linuxtools.tmf.signal.TmfSignalHandler;
29 import org.eclipse.linuxtools.tmf.signal.TmfStartSynchSignal;
30 import org.eclipse.linuxtools.tmf.trace.ITmfContext;
31
32 /**
33 * <b><u>TmfProvider</u></b>
34 * <p>
35 * The TmfProvider<T> is a provider for a data of type <T>.
36 * <p>
37 * This abstract class implements the housekeeking methods to register/
38 * deregister the event provider and to handle generically the event requests.
39 * <p>
40 * The concrete class can either re-implement processRequest() entirely or
41 * just implement the hooks (initializeContext() and getNext()).
42 * <p>
43 * TODO: Add support for providing multiple data types.
44 */
45 public abstract class TmfDataProvider<T extends TmfData> extends TmfComponent implements ITmfDataProvider<T> {
46
47 // ------------------------------------------------------------------------
48 // Constants
49 // ------------------------------------------------------------------------
50
51 // private static final ITmfDataRequest.ExecutionType SHORT = ITmfDataRequest.ExecutionType.SHORT;
52 // private static final ITmfDataRequest.ExecutionType LONG = ITmfDataRequest.ExecutionType.LONG;
53
54 // ------------------------------------------------------------------------
55 //
56 // ------------------------------------------------------------------------
57
58 final protected Class<T> fType;
59 final protected boolean fLogData;
60 final protected boolean fLogError;
61
62 public static final int DEFAULT_BLOCK_SIZE = 50000;
63 public static final int DEFAULT_QUEUE_SIZE = 1000;
64
65 protected final int fQueueSize;
66 protected final BlockingQueue<T> fDataQueue;
67 protected final TmfRequestExecutor fExecutor;
68
69 private int fSignalDepth = 0;
70 private final Object fLock = new Object();
71
72 private int fRequestPendingCounter = 0;
73
74 // ------------------------------------------------------------------------
75 // Constructors
76 // ------------------------------------------------------------------------
77
78 public TmfDataProvider(String name, Class<T> type) {
79 this(name, type, DEFAULT_QUEUE_SIZE);
80 }
81
82 protected TmfDataProvider(String name, Class<T> type, int queueSize) {
83 super(name);
84 fType = type;
85 fQueueSize = queueSize;
86 fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
87
88 fExecutor = new TmfRequestExecutor();
89 fSignalDepth = 0;
90
91 fLogData = Tracer.isEventTraced();
92 fLogError = Tracer.isErrorTraced();
93
94 TmfProviderManager.register(fType, this);
95 // if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "started");
96 }
97
98 public TmfDataProvider(TmfDataProvider<T> other) {
99 super(other);
100 fType = other.fType;
101 fQueueSize = other.fQueueSize;
102 fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
103
104 fExecutor = new TmfRequestExecutor();
105 fSignalDepth = 0;
106
107 fLogData = Tracer.isEventTraced();
108 fLogError = Tracer.isErrorTraced();
109 }
110
111 @Override
112 public void dispose() {
113 TmfProviderManager.deregister(fType, this);
114 fExecutor.stop();
115 super.dispose();
116 // if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "stopped");
117 }
118
119 public int getQueueSize() {
120 return fQueueSize;
121 }
122
123 public Class<?> getType() {
124 return fType;
125 }
126
127 // ------------------------------------------------------------------------
128 // ITmfRequestHandler
129 // ------------------------------------------------------------------------
130
131 @Override
132 public void sendRequest(final ITmfDataRequest<T> request) {
133 synchronized(fLock) {
134 if (fSignalDepth > 0) {
135 coalesceDataRequest(request);
136 } else {
137 dispatchRequest(request);
138 }
139 }
140 }
141
142 /**
143 * This method queues the coalesced requests.
144 *
145 * @param thread
146 */
147 @Override
148 public void fireRequest() {
149 synchronized(fLock) {
150 if (fRequestPendingCounter > 0) {
151 return;
152 }
153 if (fPendingCoalescedRequests.size() > 0) {
154 for (TmfDataRequest<T> request : fPendingCoalescedRequests) {
155 dispatchRequest(request);
156 }
157 fPendingCoalescedRequests.clear();
158 }
159 }
160 }
161
162 /**
163 * Increments/decrements the pending requests counters and fires
164 * the request if necessary (counter == 0). Used for coalescing
165 * requests accross multiple TmfDataProvider.
166 *
167 * @param isIncrement
168 */
169 @Override
170 public void notifyPendingRequest(boolean isIncrement) {
171 synchronized(fLock) {
172 if (isIncrement) {
173 if (fSignalDepth > 0) {
174 fRequestPendingCounter++;
175 }
176 } else {
177 if (fRequestPendingCounter > 0) {
178 fRequestPendingCounter--;
179 }
180
181 // fire request if all pending requests are received
182 if (fRequestPendingCounter == 0) {
183 fireRequest();
184 }
185 }
186 }
187 }
188
189 // ------------------------------------------------------------------------
190 // Coalescing (primitive test...)
191 // ------------------------------------------------------------------------
192
193 protected Vector<TmfCoalescedDataRequest<T>> fPendingCoalescedRequests = new Vector<TmfCoalescedDataRequest<T>>();
194
195 protected void newCoalescedDataRequest(ITmfDataRequest<T> request) {
196 synchronized(fLock) {
197 TmfCoalescedDataRequest<T> coalescedRequest = new TmfCoalescedDataRequest<T>(
198 fType, request.getIndex(), request.getNbRequested(), request.getBlockSize(), request.getExecType());
199 coalescedRequest.addRequest(request);
200 if (Tracer.isRequestTraced()) {
201 Tracer.traceRequest(request, "coalesced with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
202 }
203 fPendingCoalescedRequests.add(coalescedRequest);
204 }
205 }
206
207 protected void coalesceDataRequest(ITmfDataRequest<T> request) {
208 synchronized(fLock) {
209 for (TmfCoalescedDataRequest<T> coalescedRequest : fPendingCoalescedRequests) {
210 if (coalescedRequest.isCompatible(request)) {
211 coalescedRequest.addRequest(request);
212 if (Tracer.isRequestTraced()) {
213 Tracer.traceRequest(request, "coalesced with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
214 }
215 return;
216 }
217 }
218 newCoalescedDataRequest(request);
219 }
220 }
221
222 // ------------------------------------------------------------------------
223 // Request processing
224 // ------------------------------------------------------------------------
225
226 private void dispatchRequest(final ITmfDataRequest<T> request) {
227 if (request.getExecType() == ExecutionType.FOREGROUND)
228 queueRequest(request);
229 else
230 queueBackgroundRequest(request, request.getBlockSize(), true);
231 }
232
233 protected void queueRequest(final ITmfDataRequest<T> request) {
234
235 if (fExecutor.isShutdown()) {
236 request.cancel();
237 return;
238 }
239
240 final TmfDataProvider<T> provider = this;
241
242 // Process the request
243 TmfThread thread = new TmfThread(request.getExecType()) {
244
245 @Override
246 public void run() {
247
248 if (Tracer.isRequestTraced()) Tracer.trace("Request #" + request.getRequestId() + " is being serviced by " + provider.getName()); //$NON-NLS-1$//$NON-NLS-2$
249
250 // Extract the generic information
251 request.start();
252 int nbRequested = request.getNbRequested();
253 int nbRead = 0;
254
255 // Initialize the execution
256 ITmfContext context = armRequest(request);
257 if (context == null) {
258 request.cancel();
259 return;
260 }
261
262 try {
263 // Get the ordered events
264 T data = getNext(context);
265 if (Tracer.isRequestTraced()) Tracer.trace("Request #" + request.getRequestId() + " read first event"); //$NON-NLS-1$ //$NON-NLS-2$
266 while (data != null && !isCompleted(request, data, nbRead))
267 {
268 if (fLogData) Tracer.traceEvent(provider, request, data);
269 request.handleData(data);
270
271 // To avoid an unnecessary read passed the last data requested
272 if (++nbRead < nbRequested) {
273 data = getNext(context);
274 }
275 }
276 if (Tracer.isRequestTraced()) Tracer.trace("Request #" + request.getRequestId() + " finished"); //$NON-NLS-1$//$NON-NLS-2$
277
278 if (request.isCancelled()) {
279 request.cancel();
280 }
281 else {
282 request.done();
283 }
284 }
285 catch (Exception e) {
286 request.fail();
287 }
288 }
289 };
290
291 if (Tracer.isRequestTraced()) Tracer.traceRequest(request, "queued"); //$NON-NLS-1$
292 fExecutor.execute(thread);
293
294 }
295
296 protected void queueBackgroundRequest(final ITmfDataRequest<T> request, final int blockSize, final boolean indexing) {
297
298 Thread thread = new Thread() {
299 @Override
300 public void run() {
301 request.start();
302
303 final Integer[] CHUNK_SIZE = new Integer[1];
304 CHUNK_SIZE[0] = Math.min(request.getNbRequested(), blockSize + ((indexing) ? 1 : 0));
305
306 final Integer[] nbRead = new Integer[1];
307 nbRead[0] = 0;
308
309 final Boolean[] isFinished = new Boolean[1];
310 isFinished[0] = Boolean.FALSE;
311
312 while (!isFinished[0]) {
313
314 TmfDataRequest<T> subRequest= new TmfDataRequest<T>(request.getDataType(), request.getIndex() + nbRead[0], CHUNK_SIZE[0], blockSize, ExecutionType.BACKGROUND)
315 {
316 @Override
317 public void handleData(T data) {
318 super.handleData(data);
319 request.handleData(data);
320 if (getNbRead() > CHUNK_SIZE[0]) {
321 System.out.println("ERROR - Read too many events"); //$NON-NLS-1$
322 }
323 }
324
325 @Override
326 public void handleCompleted() {
327 nbRead[0] += getNbRead();
328 if (nbRead[0] >= request.getNbRequested() || (getNbRead() < CHUNK_SIZE[0])) {
329 if (isCancelled()) {
330 request.cancel();
331 }
332 else {
333 request.done();
334 }
335 isFinished[0] = Boolean.TRUE;
336 }
337 super.handleCompleted();
338 }
339 };
340
341 if (!isFinished[0]) {
342 queueRequest(subRequest);
343
344 try {
345 subRequest.waitForCompletion();
346 } catch (InterruptedException e) {
347 e.printStackTrace();
348 }
349
350 CHUNK_SIZE[0] = Math.min(request.getNbRequested() - nbRead[0], blockSize);
351 }
352 }
353 }
354 };
355
356 thread.start();
357 }
358
359 /**
360 * Initialize the provider based on the request. The context is
361 * provider specific and will be updated by getNext().
362 *
363 * @param request
364 * @return an application specific context; null if request can't be serviced
365 */
366 public abstract ITmfContext armRequest(ITmfDataRequest<T> request);
367 public abstract T getNext(ITmfContext context);
368
369 /**
370 * Checks if the data meets the request completion criteria.
371 *
372 * @param request
373 * @param data
374 * @return
375 */
376 public boolean isCompleted(ITmfDataRequest<T> request, T data, int nbRead) {
377 return request.isCompleted() || nbRead >= request.getNbRequested() || data.isNullRef();
378 }
379
380 // ------------------------------------------------------------------------
381 // Signal handlers
382 // ------------------------------------------------------------------------
383
384 @TmfSignalHandler
385 public void startSynch(TmfStartSynchSignal signal) {
386 synchronized (fLock) {
387 fSignalDepth++;
388 }
389 }
390
391 @TmfSignalHandler
392 public void endSynch(TmfEndSynchSignal signal) {
393 synchronized (fLock) {
394 fSignalDepth--;
395 if (fSignalDepth == 0) {
396 fireRequest();
397 }
398 }
399 }
400
401 }
This page took 0.039834 seconds and 5 git commands to generate.