54b37efc06ff5fe53ccc6ce3b3d717471f136ac1
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf.core / src / org / eclipse / linuxtools / tmf / core / component / TmfDataProvider.java
1 /*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
12
13 package org.eclipse.linuxtools.tmf.core.component;
14
15 import java.util.Vector;
16 import java.util.concurrent.BlockingQueue;
17 import java.util.concurrent.LinkedBlockingQueue;
18 import java.util.concurrent.SynchronousQueue;
19
20 import org.eclipse.linuxtools.internal.tmf.core.Tracer;
21 import org.eclipse.linuxtools.tmf.core.event.ITmfEvent;
22 import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest;
23 import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest.ExecutionType;
24 import org.eclipse.linuxtools.tmf.core.request.TmfCoalescedDataRequest;
25 import org.eclipse.linuxtools.tmf.core.request.TmfDataRequest;
26 import org.eclipse.linuxtools.tmf.core.request.TmfRequestExecutor;
27 import org.eclipse.linuxtools.tmf.core.signal.TmfEndSynchSignal;
28 import org.eclipse.linuxtools.tmf.core.signal.TmfSignalHandler;
29 import org.eclipse.linuxtools.tmf.core.signal.TmfStartSynchSignal;
30 import org.eclipse.linuxtools.tmf.core.trace.ITmfContext;
31
32 /**
33 * <b><u>TmfProvider</u></b>
34 * <p>
35 * The TmfProvider<T> is a provider for a data of type <T>.
36 * <p>
37 * This abstract class implements the housekeeking methods to register/ deregister the event provider and to handle
38 * generically the event requests.
39 * <p>
40 * The concrete class can either re-implement processRequest() entirely or just implement the hooks (initializeContext()
41 * and getNext()).
42 * <p>
43 * TODO: Add support for providing multiple data types.
44 */
45 public abstract class TmfDataProvider<T extends ITmfEvent> extends TmfComponent implements ITmfDataProvider<T> {
46
47 // ------------------------------------------------------------------------
48 // Constants
49 // ------------------------------------------------------------------------
50
51 public static final int DEFAULT_BLOCK_SIZE = 50000;
52 public static final int DEFAULT_QUEUE_SIZE = 1000;
53
54 // ------------------------------------------------------------------------
55 // Attributes
56 // ------------------------------------------------------------------------
57
58 protected Class<T> fType;
59 protected boolean fLogData;
60 protected boolean fLogError;
61
62 protected int fQueueSize = DEFAULT_QUEUE_SIZE;
63 protected BlockingQueue<T> fDataQueue;
64 protected TmfRequestExecutor fExecutor;
65
66 private int fSignalDepth = 0;
67 private final Object fLock = new Object();
68
69 private int fRequestPendingCounter = 0;
70
71 // ------------------------------------------------------------------------
72 // Constructors
73 // ------------------------------------------------------------------------
74
75 public TmfDataProvider() {
76 super();
77 fQueueSize = DEFAULT_QUEUE_SIZE;
78 fDataQueue = new LinkedBlockingQueue<T>(fQueueSize);
79 fExecutor = new TmfRequestExecutor();
80 }
81
82 public void init(String name, Class<T> type) {
83 super.init(name);
84 fType = type;
85 fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
86
87 fExecutor = new TmfRequestExecutor();
88 fSignalDepth = 0;
89
90 fLogData = Tracer.isEventTraced();
91 fLogError = Tracer.isErrorTraced();
92
93 TmfProviderManager.register(fType, this);
94 }
95
96 protected TmfDataProvider(String name, Class<T> type, int queueSize) {
97 this();
98 fQueueSize = queueSize;
99 init(name, type);
100 }
101
102 public TmfDataProvider(TmfDataProvider<T> other) {
103 this();
104 init(other.getName(), other.fType);
105 }
106
107 public TmfDataProvider(String name, Class<T> type) {
108 this(name, type, DEFAULT_QUEUE_SIZE);
109 }
110
111 @Override
112 public void dispose() {
113 TmfProviderManager.deregister(fType, this);
114 fExecutor.stop();
115 super.dispose();
116 // if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "stopped");
117 }
118
119 // ------------------------------------------------------------------------
120 // Accessors
121 // ------------------------------------------------------------------------
122
123 public int getQueueSize() {
124 return fQueueSize;
125 }
126
127 public Class<?> getType() {
128 return fType;
129 }
130
131 // ------------------------------------------------------------------------
132 // ITmfRequestHandler
133 // ------------------------------------------------------------------------
134
135 @Override
136 public void sendRequest(final ITmfDataRequest<T> request) {
137 synchronized (fLock) {
138 if (fSignalDepth > 0) {
139 coalesceDataRequest(request);
140 } else {
141 dispatchRequest(request);
142 }
143 }
144 }
145
146 /**
147 * This method queues the coalesced requests.
148 */
149 @Override
150 public void fireRequest() {
151 synchronized (fLock) {
152 if (fRequestPendingCounter > 0) {
153 return;
154 }
155 if (fPendingCoalescedRequests.size() > 0) {
156 for (TmfDataRequest<T> request : fPendingCoalescedRequests) {
157 dispatchRequest(request);
158 }
159 fPendingCoalescedRequests.clear();
160 }
161 }
162 }
163
164 /**
165 * Increments/decrements the pending requests counters and fires the request if necessary (counter == 0). Used for
166 * coalescing requests accross multiple TmfDataProvider.
167 *
168 * @param isIncrement
169 */
170 @Override
171 public void notifyPendingRequest(boolean isIncrement) {
172 synchronized (fLock) {
173 if (isIncrement) {
174 if (fSignalDepth > 0) {
175 fRequestPendingCounter++;
176 }
177 } else {
178 if (fRequestPendingCounter > 0) {
179 fRequestPendingCounter--;
180 }
181
182 // fire request if all pending requests are received
183 if (fRequestPendingCounter == 0) {
184 fireRequest();
185 }
186 }
187 }
188 }
189
190 // ------------------------------------------------------------------------
191 // Coalescing (primitive test...)
192 // ------------------------------------------------------------------------
193
194 protected Vector<TmfCoalescedDataRequest<T>> fPendingCoalescedRequests = new Vector<TmfCoalescedDataRequest<T>>();
195
196 protected void newCoalescedDataRequest(ITmfDataRequest<T> request) {
197 synchronized (fLock) {
198 TmfCoalescedDataRequest<T> coalescedRequest = new TmfCoalescedDataRequest<T>(request.getDataType(), request.getIndex(),
199 request.getNbRequested(), request.getBlockSize(), request.getExecType());
200 coalescedRequest.addRequest(request);
201 if (Tracer.isRequestTraced()) {
202 Tracer.traceRequest(request, "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
203 Tracer.traceRequest(coalescedRequest, "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$
204 }
205 fPendingCoalescedRequests.add(coalescedRequest);
206 }
207 }
208
209 protected void coalesceDataRequest(ITmfDataRequest<T> request) {
210 synchronized (fLock) {
211 for (TmfCoalescedDataRequest<T> coalescedRequest : fPendingCoalescedRequests) {
212 if (coalescedRequest.isCompatible(request)) {
213 coalescedRequest.addRequest(request);
214 if (Tracer.isRequestTraced()) {
215 Tracer.traceRequest(request, "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
216 Tracer.traceRequest(coalescedRequest, "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$
217 }
218 return;
219 }
220 }
221 newCoalescedDataRequest(request);
222 }
223 }
224
225 // ------------------------------------------------------------------------
226 // Request processing
227 // ------------------------------------------------------------------------
228
229 private void dispatchRequest(final ITmfDataRequest<T> request) {
230 if (request.getExecType() == ExecutionType.FOREGROUND)
231 queueRequest(request);
232 else
233 queueBackgroundRequest(request, request.getBlockSize(), true);
234 }
235
236 protected void queueRequest(final ITmfDataRequest<T> request) {
237
238 if (fExecutor.isShutdown()) {
239 request.cancel();
240 return;
241 }
242
243 final TmfDataProvider<T> provider = this;
244
245 // Process the request
246 TmfThread thread = new TmfThread(request.getExecType()) {
247
248 @Override
249 public void run() {
250
251 if (Tracer.isRequestTraced()) {
252 Tracer.traceRequest(request, "is being serviced by " + provider.getName()); //$NON-NLS-1$
253 }
254
255 // Extract the generic information
256 request.start();
257 int nbRequested = request.getNbRequested();
258 int nbRead = 0;
259
260 // Initialize the execution
261 ITmfContext context = armRequest(request);
262 if (context == null) {
263 request.cancel();
264 return;
265 }
266
267 try {
268 // Get the ordered events
269 T data = getNext(context);
270 if (Tracer.isRequestTraced())
271 Tracer.traceRequest(request, "read first event"); //$NON-NLS-1$
272 while (data != null && !isCompleted(request, data, nbRead)) {
273 // if (fLogData)
274 // Tracer.traceEvent(provider, request, data);
275 if (request.getDataType().isInstance(data)) {
276 request.handleData(data);
277 }
278
279 // To avoid an unnecessary read passed the last data
280 // requested
281 if (++nbRead < nbRequested) {
282 data = getNext(context);
283 }
284 }
285 if (Tracer.isRequestTraced())
286 Tracer.traceRequest(request, "COMPLETED"); //$NON-NLS-1$
287
288 if (request.isCancelled()) {
289 request.cancel();
290 } else {
291 request.done();
292 }
293 } catch (Exception e) {
294 request.fail();
295 }
296
297 // Cleanup
298 context.dispose();
299 }
300
301 @Override
302 public void cancel() {
303 if (!request.isCompleted()) {
304 request.cancel();
305 }
306 }
307 };
308
309 if (Tracer.isRequestTraced())
310 Tracer.traceRequest(request, "QUEUED"); //$NON-NLS-1$
311 fExecutor.execute(thread);
312
313 }
314
315 protected void queueBackgroundRequest(final ITmfDataRequest<T> request, final int blockSize, final boolean indexing) {
316
317 final TmfDataProvider<T> provider = this;
318
319 Thread thread = new Thread() {
320 @Override
321 public void run() {
322
323 if (Tracer.isRequestTraced()) {
324 Tracer.traceRequest(request, "is being serviced by " + provider.getName()); //$NON-NLS-1$
325 }
326
327 request.start();
328
329 final Integer[] CHUNK_SIZE = new Integer[1];
330 CHUNK_SIZE[0] = Math.min(request.getNbRequested(), blockSize + ((indexing) ? 1 : 0));
331
332 final Integer[] nbRead = new Integer[1];
333 nbRead[0] = 0;
334
335 final Boolean[] isFinished = new Boolean[1];
336 isFinished[0] = Boolean.FALSE;
337
338 while (!isFinished[0]) {
339
340 TmfDataRequest<T> subRequest = new TmfDataRequest<T>(request.getDataType(), request.getIndex()
341 + nbRead[0], CHUNK_SIZE[0], blockSize, ExecutionType.BACKGROUND) {
342 @Override
343 public void handleData(T data) {
344 super.handleData(data);
345 if (request.getDataType().isInstance(data)) {
346 request.handleData(data);
347 }
348 if (getNbRead() > CHUNK_SIZE[0]) {
349 System.out.println("ERROR - Read too many events"); //$NON-NLS-1$
350 }
351 }
352
353 @Override
354 public void handleCompleted() {
355 nbRead[0] += getNbRead();
356 if (nbRead[0] >= request.getNbRequested() || (getNbRead() < CHUNK_SIZE[0])) {
357 if (this.isCancelled()) {
358 request.cancel();
359 } else if (this.isFailed()) {
360 request.fail();
361 } else {
362 request.done();
363 }
364 isFinished[0] = Boolean.TRUE;
365 }
366 super.handleCompleted();
367 }
368 };
369
370 if (!isFinished[0]) {
371 queueRequest(subRequest);
372
373 try {
374 subRequest.waitForCompletion();
375 } catch (InterruptedException e) {
376 e.printStackTrace();
377 }
378
379 CHUNK_SIZE[0] = Math.min(request.getNbRequested() - nbRead[0], blockSize);
380 }
381 }
382 }
383 };
384
385 thread.start();
386 }
387
388 /**
389 * Initialize the provider based on the request. The context is provider
390 * specific and will be updated by getNext().
391 *
392 * @param request
393 * @return an application specific context; null if request can't be serviced
394 */
395 protected abstract ITmfContext armRequest(ITmfDataRequest<T> request);
396
397 // /**
398 // * Return the next event based on the context supplied. The context
399 // * will be updated for the subsequent read.
400 // *
401 // * @param context the trace read context (updated)
402 // * @return the event referred to by context
403 // */
404 // public abstract T getNext(ITmfContext context);
405
406 /**
407 * Checks if the data meets the request completion criteria.
408 *
409 * @param request the request
410 * @param data the data to verify
411 * @param nbRead the number of events read so far
412 * @return true if completion criteria is met
413 */
414 public boolean isCompleted(ITmfDataRequest<T> request, T data, int nbRead) {
415 return request.isCompleted() || nbRead >= request.getNbRequested();
416 }
417
418 // ------------------------------------------------------------------------
419 // Signal handlers
420 // ------------------------------------------------------------------------
421
422 @TmfSignalHandler
423 public void startSynch(TmfStartSynchSignal signal) {
424 synchronized (fLock) {
425 fSignalDepth++;
426 }
427 }
428
429 @TmfSignalHandler
430 public void endSynch(TmfEndSynchSignal signal) {
431 synchronized (fLock) {
432 fSignalDepth--;
433 if (fSignalDepth == 0) {
434 fireRequest();
435 }
436 }
437 }
438
439 }
This page took 0.059342 seconds and 5 git commands to generate.