tmf: Add the Tmf- prefix to the state system interfaces
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf.core / src / org / eclipse / linuxtools / tmf / core / component / TmfDataProvider.java
1 /*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
12
13 package org.eclipse.linuxtools.tmf.core.component;
14
15 import java.util.Vector;
16 import java.util.concurrent.BlockingQueue;
17 import java.util.concurrent.LinkedBlockingQueue;
18 import java.util.concurrent.SynchronousQueue;
19
20 import org.eclipse.linuxtools.internal.tmf.core.TmfCoreTracer;
21 import org.eclipse.linuxtools.internal.tmf.core.component.TmfProviderManager;
22 import org.eclipse.linuxtools.internal.tmf.core.component.TmfThread;
23 import org.eclipse.linuxtools.internal.tmf.core.request.TmfCoalescedDataRequest;
24 import org.eclipse.linuxtools.internal.tmf.core.request.TmfRequestExecutor;
25 import org.eclipse.linuxtools.tmf.core.event.ITmfEvent;
26 import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest;
27 import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest.ExecutionType;
28 import org.eclipse.linuxtools.tmf.core.request.TmfDataRequest;
29 import org.eclipse.linuxtools.tmf.core.signal.TmfEndSynchSignal;
30 import org.eclipse.linuxtools.tmf.core.signal.TmfSignalHandler;
31 import org.eclipse.linuxtools.tmf.core.signal.TmfStartSynchSignal;
32 import org.eclipse.linuxtools.tmf.core.trace.ITmfContext;
33
34 /**
35 * An abstract base class that implements ITmfDataProvider.
36 * <p>
37 * This abstract class implements the housekeeping methods to register/
38 * de-register the event provider and to handle generically the event requests.
39 * <p>
40 * The concrete class can either re-implement processRequest() entirely or just
41 * implement the hooks (initializeContext() and getNext()).
42 * <p>
43 * TODO: Add support for providing multiple data types.
44 *
45 * @version 1.0
46 * @author Francois Chouinard
47 */
48 public abstract class TmfDataProvider extends TmfComponent implements ITmfDataProvider {
49
50 // ------------------------------------------------------------------------
51 // Constants
52 // ------------------------------------------------------------------------
53
54 /** Default amount of events per request "chunk" */
55 public static final int DEFAULT_BLOCK_SIZE = 50000;
56
57 /** Default size of the queue */
58 public static final int DEFAULT_QUEUE_SIZE = 1000;
59
60 // ------------------------------------------------------------------------
61 // Attributes
62 // ------------------------------------------------------------------------
63
64 protected Class<? extends ITmfEvent> fType;
65 protected boolean fLogData;
66 protected boolean fLogError;
67
68 protected int fQueueSize = DEFAULT_QUEUE_SIZE;
69 protected BlockingQueue<ITmfEvent> fDataQueue;
70 private TmfRequestExecutor fExecutor;
71
72 private int fSignalDepth = 0;
73 private final Object fLock = new Object();
74
75 private int fRequestPendingCounter = 0;
76
77 // ------------------------------------------------------------------------
78 // Constructors
79 // ------------------------------------------------------------------------
80
81 /**
82 * Default constructor
83 */
84 public TmfDataProvider() {
85 super();
86 fQueueSize = DEFAULT_QUEUE_SIZE;
87 fDataQueue = new LinkedBlockingQueue<ITmfEvent>(fQueueSize);
88 fExecutor = new TmfRequestExecutor();
89 }
90
91 /**
92 * Initialize this data provider
93 *
94 * @param name
95 * Name of the provider
96 * @param type
97 * The type of events that will be handled
98 */
99 public void init(String name, Class<? extends ITmfEvent> type) {
100 super.init(name);
101 fType = type;
102 fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<ITmfEvent>(fQueueSize) : new SynchronousQueue<ITmfEvent>();
103
104 fExecutor = new TmfRequestExecutor();
105 fSignalDepth = 0;
106
107 fLogData = TmfCoreTracer.isEventTraced();
108 // fLogError = TmfCoreTracer.isErrorTraced();
109
110 TmfProviderManager.register(fType, this);
111 }
112
113 protected TmfDataProvider(String name, Class<? extends ITmfEvent> type, int queueSize) {
114 this();
115 fQueueSize = queueSize;
116 init(name, type);
117 }
118
119 /**
120 * Copy constructor
121 *
122 * @param other
123 * The other object to copy
124 */
125 public TmfDataProvider(TmfDataProvider other) {
126 this();
127 init(other.getName(), other.fType);
128 }
129
130 /**
131 * Standard constructor. Instantiate and initialize at the same time.
132 *
133 * @param name
134 * Name of the provider
135 * @param type
136 * The type of events that will be handled
137 */
138 public TmfDataProvider(String name, Class<? extends ITmfEvent> type) {
139 this(name, type, DEFAULT_QUEUE_SIZE);
140 }
141
142 @Override
143 public void dispose() {
144 TmfProviderManager.deregister(fType, this);
145 fExecutor.stop();
146 super.dispose();
147 // if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "stopped");
148 }
149
150 // ------------------------------------------------------------------------
151 // Accessors
152 // ------------------------------------------------------------------------
153
154 /**
155 * Get the queue size of this provider
156 *
157 * @return The size of the queue
158 */
159 public int getQueueSize() {
160 return fQueueSize;
161 }
162
163 /**
164 * Get the event type this provider handles
165 *
166 * @return The type of ITmfEvent
167 */
168 public Class<?> getType() {
169 return fType;
170 }
171
172 // ------------------------------------------------------------------------
173 // ITmfRequestHandler
174 // ------------------------------------------------------------------------
175
176 @Override
177 public void sendRequest(final ITmfDataRequest request) {
178 synchronized (fLock) {
179 if (fSignalDepth > 0) {
180 coalesceDataRequest(request);
181 } else {
182 dispatchRequest(request);
183 }
184 }
185 }
186
187 @Override
188 public void fireRequest() {
189 synchronized (fLock) {
190 if (fRequestPendingCounter > 0) {
191 return;
192 }
193 if (fPendingCoalescedRequests.size() > 0) {
194 for (TmfDataRequest request : fPendingCoalescedRequests) {
195 dispatchRequest(request);
196 }
197 fPendingCoalescedRequests.clear();
198 }
199 }
200 }
201
202 /**
203 * Increments/decrements the pending requests counters and fires the request
204 * if necessary (counter == 0). Used for coalescing requests across multiple
205 * TmfDataProvider's.
206 *
207 * @param isIncrement
208 * Should we increment (true) or decrement (false) the pending
209 * counter
210 */
211 @Override
212 public void notifyPendingRequest(boolean isIncrement) {
213 synchronized (fLock) {
214 if (isIncrement) {
215 if (fSignalDepth > 0) {
216 fRequestPendingCounter++;
217 }
218 } else {
219 if (fRequestPendingCounter > 0) {
220 fRequestPendingCounter--;
221 }
222
223 // fire request if all pending requests are received
224 if (fRequestPendingCounter == 0) {
225 fireRequest();
226 }
227 }
228 }
229 }
230
231 // ------------------------------------------------------------------------
232 // Coalescing (primitive test...)
233 // ------------------------------------------------------------------------
234
235 protected Vector<TmfCoalescedDataRequest> fPendingCoalescedRequests = new Vector<TmfCoalescedDataRequest>();
236
237 protected void newCoalescedDataRequest(ITmfDataRequest request) {
238 synchronized (fLock) {
239 TmfCoalescedDataRequest coalescedRequest = new TmfCoalescedDataRequest(request.getDataType(), request.getIndex(),
240 request.getNbRequested(), request.getBlockSize(), request.getExecType());
241 coalescedRequest.addRequest(request);
242 if (TmfCoreTracer.isRequestTraced()) {
243 TmfCoreTracer.traceRequest(request, "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
244 TmfCoreTracer.traceRequest(coalescedRequest, "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$
245 }
246 fPendingCoalescedRequests.add(coalescedRequest);
247 }
248 }
249
250 protected void coalesceDataRequest(ITmfDataRequest request) {
251 synchronized (fLock) {
252 for (TmfCoalescedDataRequest coalescedRequest : fPendingCoalescedRequests) {
253 if (coalescedRequest.isCompatible(request)) {
254 coalescedRequest.addRequest(request);
255 if (TmfCoreTracer.isRequestTraced()) {
256 TmfCoreTracer.traceRequest(request, "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
257 TmfCoreTracer.traceRequest(coalescedRequest, "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$
258 }
259 return;
260 }
261 }
262 newCoalescedDataRequest(request);
263 }
264 }
265
266 // ------------------------------------------------------------------------
267 // Request processing
268 // ------------------------------------------------------------------------
269
270 private void dispatchRequest(final ITmfDataRequest request) {
271 if (request.getExecType() == ExecutionType.FOREGROUND) {
272 queueRequest(request);
273 } else {
274 queueBackgroundRequest(request, request.getBlockSize(), true);
275 }
276 }
277
278 protected void queueRequest(final ITmfDataRequest request) {
279
280 if (fExecutor.isShutdown()) {
281 request.cancel();
282 return;
283 }
284
285 final TmfDataProvider provider = this;
286
287 // Process the request
288 TmfThread thread = new TmfThread(request.getExecType()) {
289
290 @Override
291 public void run() {
292
293 if (TmfCoreTracer.isRequestTraced()) {
294 TmfCoreTracer.traceRequest(request, "is being serviced by " + provider.getName()); //$NON-NLS-1$
295 }
296
297 // Extract the generic information
298 request.start();
299 int nbRequested = request.getNbRequested();
300 int nbRead = 0;
301
302 // Initialize the execution
303 ITmfContext context = armRequest(request);
304 if (context == null) {
305 request.cancel();
306 return;
307 }
308
309 try {
310 // Get the ordered events
311 ITmfEvent data = getNext(context);
312 if (TmfCoreTracer.isRequestTraced()) {
313 TmfCoreTracer.traceRequest(request, "read first event"); //$NON-NLS-1$
314 }
315 while (data != null && !isCompleted(request, data, nbRead)) {
316 if (fLogData) {
317 TmfCoreTracer.traceEvent(provider, request, data);
318 }
319 if (request.getDataType().isInstance(data)) {
320 request.handleData(data);
321 }
322
323 // To avoid an unnecessary read passed the last data
324 // requested
325 if (++nbRead < nbRequested) {
326 data = getNext(context);
327 }
328 }
329 if (TmfCoreTracer.isRequestTraced()) {
330 TmfCoreTracer.traceRequest(request, "COMPLETED"); //$NON-NLS-1$
331 }
332
333 if (request.isCancelled()) {
334 request.cancel();
335 } else {
336 request.done();
337 }
338 } catch (Exception e) {
339 request.fail();
340 }
341
342 // Cleanup
343 context.dispose();
344 }
345
346 @Override
347 public void cancel() {
348 if (!request.isCompleted()) {
349 request.cancel();
350 }
351 }
352 };
353
354 if (TmfCoreTracer.isRequestTraced()) {
355 TmfCoreTracer.traceRequest(request, "QUEUED"); //$NON-NLS-1$
356 }
357 fExecutor.execute(thread);
358
359 }
360
361 protected void queueBackgroundRequest(final ITmfDataRequest request, final int blockSize, final boolean indexing) {
362
363 final TmfDataProvider provider = this;
364
365 Thread thread = new Thread() {
366 @Override
367 public void run() {
368
369 if (TmfCoreTracer.isRequestTraced()) {
370 TmfCoreTracer.traceRequest(request, "is being serviced by " + provider.getName()); //$NON-NLS-1$
371 }
372
373 request.start();
374
375 final Integer[] CHUNK_SIZE = new Integer[1];
376 CHUNK_SIZE[0] = Math.min(request.getNbRequested(), blockSize + ((indexing) ? 1 : 0));
377
378 final Integer[] nbRead = new Integer[1];
379 nbRead[0] = 0;
380
381 final Boolean[] isFinished = new Boolean[1];
382 isFinished[0] = Boolean.FALSE;
383
384 while (!isFinished[0]) {
385
386 TmfDataRequest subRequest = new TmfDataRequest(request.getDataType(), request.getIndex()
387 + nbRead[0], CHUNK_SIZE[0], blockSize, ExecutionType.BACKGROUND) {
388
389 @Override
390 public synchronized boolean isCompleted() {
391 return super.isCompleted() || request.isCompleted();
392 }
393
394 @Override
395 public void handleData(ITmfEvent data) {
396 super.handleData(data);
397 if (request.getDataType().isInstance(data)) {
398 request.handleData(data);
399 }
400 if (getNbRead() > CHUNK_SIZE[0]) {
401 System.out.println("ERROR - Read too many events"); //$NON-NLS-1$
402 }
403 }
404
405 @Override
406 public void handleCompleted() {
407 nbRead[0] += getNbRead();
408 if (nbRead[0] >= request.getNbRequested() || (getNbRead() < CHUNK_SIZE[0])) {
409 if (this.isCancelled()) {
410 request.cancel();
411 } else if (this.isFailed()) {
412 request.fail();
413 } else {
414 request.done();
415 }
416 isFinished[0] = Boolean.TRUE;
417 }
418 super.handleCompleted();
419 }
420 };
421
422 if (!isFinished[0]) {
423 queueRequest(subRequest);
424
425 try {
426 subRequest.waitForCompletion();
427 if (request.isCompleted()) {
428 isFinished[0] = Boolean.TRUE;
429 }
430 } catch (InterruptedException e) {
431 e.printStackTrace();
432 }
433
434 CHUNK_SIZE[0] = Math.min(request.getNbRequested() - nbRead[0], blockSize);
435 }
436 }
437 }
438 };
439
440 thread.start();
441 }
442
443 /**
444 * Initialize the provider based on the request. The context is provider
445 * specific and will be updated by getNext().
446 *
447 * @param request
448 * @return an application specific context; null if request can't be serviced
449 */
450 protected abstract ITmfContext armRequest(ITmfDataRequest request);
451
452 // /**
453 // * Return the next event based on the context supplied. The context
454 // * will be updated for the subsequent read.
455 // *
456 // * @param context the trace read context (updated)
457 // * @return the event referred to by context
458 // */
459 // public abstract T getNext(ITmfContext context);
460
461 /**
462 * Checks if the data meets the request completion criteria.
463 *
464 * @param request the request
465 * @param data the data to verify
466 * @param nbRead the number of events read so far
467 * @return true if completion criteria is met
468 */
469 public boolean isCompleted(ITmfDataRequest request, ITmfEvent data, int nbRead) {
470 return request.isCompleted() || nbRead >= request.getNbRequested();
471 }
472
473 // ------------------------------------------------------------------------
474 // Pass-through's to the request executor
475 // ------------------------------------------------------------------------
476
477 /**
478 * @return the shutdown state (i.e. if it is accepting new requests)
479 * @since 2.0
480 */
481 protected boolean executorIsShutdown() {
482 return fExecutor.isShutdown();
483 }
484
485 /**
486 * @return the termination state
487 * @since 2.0
488 */
489 protected boolean executorIsTerminated() {
490 return fExecutor.isTerminated();
491 }
492
493 // ------------------------------------------------------------------------
494 // Signal handlers
495 // ------------------------------------------------------------------------
496
497 /**
498 * Handler for the start synch signal
499 *
500 * @param signal
501 * Incoming signal
502 */
503 @TmfSignalHandler
504 public void startSynch(TmfStartSynchSignal signal) {
505 synchronized (fLock) {
506 fSignalDepth++;
507 }
508 }
509
510 /**
511 * Handler for the end synch signal
512 *
513 * @param signal
514 * Incoming signal
515 */
516 @TmfSignalHandler
517 public void endSynch(TmfEndSynchSignal signal) {
518 synchronized (fLock) {
519 fSignalDepth--;
520 if (fSignalDepth == 0) {
521 fireRequest();
522 }
523 }
524 }
525
526 }
This page took 0.042617 seconds and 5 git commands to generate.