Fix a pile of Javadoc warnings
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf.core / src / org / eclipse / linuxtools / tmf / core / component / TmfDataProvider.java
CommitLineData
8c8bf09f
ASL
1/*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
0283f7ff 3 *
8c8bf09f
ASL
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
0283f7ff 8 *
8c8bf09f
ASL
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
12
6c13869b 13package org.eclipse.linuxtools.tmf.core.component;
8c8bf09f 14
8c8bf09f
ASL
15import java.util.Vector;
16import java.util.concurrent.BlockingQueue;
17import java.util.concurrent.LinkedBlockingQueue;
9b635e61 18import java.util.concurrent.SynchronousQueue;
8c8bf09f 19
4918b8f2 20import org.eclipse.linuxtools.internal.tmf.core.Tracer;
8fd82db5
FC
21import org.eclipse.linuxtools.internal.tmf.core.component.TmfProviderManager;
22import org.eclipse.linuxtools.internal.tmf.core.component.TmfThread;
23import org.eclipse.linuxtools.internal.tmf.core.request.TmfCoalescedDataRequest;
24import org.eclipse.linuxtools.internal.tmf.core.request.TmfRequestExecutor;
72f1e62a 25import org.eclipse.linuxtools.tmf.core.event.ITmfEvent;
6c13869b 26import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest;
4c564a2d 27import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest.ExecutionType;
6c13869b 28import org.eclipse.linuxtools.tmf.core.request.TmfDataRequest;
6c13869b
FC
29import org.eclipse.linuxtools.tmf.core.signal.TmfEndSynchSignal;
30import org.eclipse.linuxtools.tmf.core.signal.TmfSignalHandler;
31import org.eclipse.linuxtools.tmf.core.signal.TmfStartSynchSignal;
32import org.eclipse.linuxtools.tmf.core.trace.ITmfContext;
8c8bf09f
ASL
33
34/**
8fd82db5 35 * An abstract base class that implements ITmfDataProvider.
8c8bf09f 36 * <p>
8fd82db5
FC
37 * This abstract class implements the housekeeping methods to register/
38 * de-register the event provider and to handle generically the event requests.
8c8bf09f 39 * <p>
8fd82db5
FC
40 * The concrete class can either re-implement processRequest() entirely or just
41 * implement the hooks (initializeContext() and getNext()).
8c8bf09f
ASL
42 * <p>
43 * TODO: Add support for providing multiple data types.
0283f7ff
FC
44 *
45 * @param <T> The provider event type
46 *
8fd82db5
FC
47 * @version 1.0
48 * @author Francois Chouinard
8c8bf09f 49 */
72f1e62a 50public abstract class TmfDataProvider<T extends ITmfEvent> extends TmfComponent implements ITmfDataProvider<T> {
8c8bf09f 51
948b0607
FC
52 // ------------------------------------------------------------------------
53 // Constants
54 // ------------------------------------------------------------------------
550d787e 55
063f0d27 56 /** Default amount of events per request "chunk" */
00641a97 57 public static final int DEFAULT_BLOCK_SIZE = 50000;
063f0d27
AM
58
59 /** Default size of the queue */
00641a97
FC
60 public static final int DEFAULT_QUEUE_SIZE = 1000;
61
948b0607 62 // ------------------------------------------------------------------------
12c155f5 63 // Attributes
948b0607 64 // ------------------------------------------------------------------------
8c8bf09f 65
12c155f5
FC
66 protected Class<T> fType;
67 protected boolean fLogData;
68 protected boolean fLogError;
f6b14ce2 69
00641a97 70 protected int fQueueSize = DEFAULT_QUEUE_SIZE;
12c155f5
FC
71 protected BlockingQueue<T> fDataQueue;
72 protected TmfRequestExecutor fExecutor;
948b0607
FC
73
74 private int fSignalDepth = 0;
045df77d 75 private final Object fLock = new Object();
951d134a 76
c1c69938
FC
77 private int fRequestPendingCounter = 0;
78
948b0607
FC
79 // ------------------------------------------------------------------------
80 // Constructors
81 // ------------------------------------------------------------------------
82
063f0d27
AM
83 /**
84 * Default constructor
85 */
12c155f5 86 public TmfDataProvider() {
00641a97 87 super();
12c155f5
FC
88 fQueueSize = DEFAULT_QUEUE_SIZE;
89 fDataQueue = new LinkedBlockingQueue<T>(fQueueSize);
90 fExecutor = new TmfRequestExecutor();
91 }
92
063f0d27
AM
93 /**
94 * Initialize this data provider
95 *
96 * @param name
97 * Name of the provider
98 * @param type
99 * The type of events that will be handled
100 */
3791b5df 101 public void init(String name, Class<T> type) {
12c155f5 102 super.init(name);
3791b5df 103 fType = type;
12c155f5
FC
104 fDataQueue = (fQueueSize > 1) ? new LinkedBlockingQueue<T>(fQueueSize) : new SynchronousQueue<T>();
105
106 fExecutor = new TmfRequestExecutor();
107 fSignalDepth = 0;
108
109 fLogData = Tracer.isEventTraced();
110 fLogError = Tracer.isErrorTraced();
111
112 TmfProviderManager.register(fType, this);
113 }
114
948b0607 115 protected TmfDataProvider(String name, Class<T> type, int queueSize) {
00641a97 116 this();
948b0607 117 fQueueSize = queueSize;
00641a97 118 init(name, type);
948b0607
FC
119 }
120
063f0d27
AM
121 /**
122 * Copy constructor
123 *
124 * @param other
125 * The other object to copy
126 */
948b0607 127 public TmfDataProvider(TmfDataProvider<T> other) {
00641a97
FC
128 this();
129 init(other.getName(), other.fType);
130 }
550d787e 131
063f0d27
AM
132 /**
133 * Standard constructor. Instantiate and initialize at the same time.
134 *
135 * @param name
136 * Name of the provider
137 * @param type
138 * The type of events that will be handled
139 */
00641a97
FC
140 public TmfDataProvider(String name, Class<T> type) {
141 this(name, type, DEFAULT_QUEUE_SIZE);
948b0607
FC
142 }
143
144 @Override
145 public void dispose() {
146 TmfProviderManager.deregister(fType, this);
147 fExecutor.stop();
148 super.dispose();
12c155f5 149 // if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "stopped");
948b0607
FC
150 }
151
00641a97
FC
152 // ------------------------------------------------------------------------
153 // Accessors
154 // ------------------------------------------------------------------------
155
063f0d27
AM
156 /**
157 * Get the queue size of this provider
158 *
159 * @return The size of the queue
160 */
948b0607
FC
161 public int getQueueSize() {
162 return fQueueSize;
163 }
164
063f0d27
AM
165 /**
166 * Get the event type this provider handles
167 *
168 * @return The type of ITmfEvent
169 */
948b0607
FC
170 public Class<?> getType() {
171 return fType;
172 }
173
174 // ------------------------------------------------------------------------
175 // ITmfRequestHandler
176 // ------------------------------------------------------------------------
177
178 @Override
179 public void sendRequest(final ITmfDataRequest<T> request) {
180 synchronized (fLock) {
181 if (fSignalDepth > 0) {
182 coalesceDataRequest(request);
183 } else {
184 dispatchRequest(request);
185 }
186 }
187 }
188
948b0607 189 @Override
c1c69938 190 public void fireRequest() {
948b0607
FC
191 synchronized (fLock) {
192 if (fRequestPendingCounter > 0) {
193 return;
194 }
195 if (fPendingCoalescedRequests.size() > 0) {
196 for (TmfDataRequest<T> request : fPendingCoalescedRequests) {
197 dispatchRequest(request);
198 }
199 fPendingCoalescedRequests.clear();
200 }
201 }
202 }
203
204 /**
063f0d27 205 * Increments/decrements the pending requests counters and fires the request
0283f7ff
FC
206 * if necessary (counter == 0). Used for coalescing requests across multiple
207 * TmfDataProvider's.
063f0d27 208 *
948b0607 209 * @param isIncrement
0283f7ff
FC
210 * Should we increment (true) or decrement (false) the pending
211 * counter
948b0607 212 */
c1c69938
FC
213 @Override
214 public void notifyPendingRequest(boolean isIncrement) {
948b0607 215 synchronized (fLock) {
c1c69938
FC
216 if (isIncrement) {
217 if (fSignalDepth > 0) {
218 fRequestPendingCounter++;
219 }
220 } else {
221 if (fRequestPendingCounter > 0) {
222 fRequestPendingCounter--;
223 }
948b0607 224
c1c69938
FC
225 // fire request if all pending requests are received
226 if (fRequestPendingCounter == 0) {
227 fireRequest();
228 }
229 }
230 }
948b0607
FC
231 }
232
233 // ------------------------------------------------------------------------
234 // Coalescing (primitive test...)
235 // ------------------------------------------------------------------------
236
237 protected Vector<TmfCoalescedDataRequest<T>> fPendingCoalescedRequests = new Vector<TmfCoalescedDataRequest<T>>();
238
239 protected void newCoalescedDataRequest(ITmfDataRequest<T> request) {
240 synchronized (fLock) {
1b70b6dc 241 TmfCoalescedDataRequest<T> coalescedRequest = new TmfCoalescedDataRequest<T>(request.getDataType(), request.getIndex(),
12c155f5 242 request.getNbRequested(), request.getBlockSize(), request.getExecType());
948b0607
FC
243 coalescedRequest.addRequest(request);
244 if (Tracer.isRequestTraced()) {
90891c08
FC
245 Tracer.traceRequest(request, "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
246 Tracer.traceRequest(coalescedRequest, "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$
948b0607
FC
247 }
248 fPendingCoalescedRequests.add(coalescedRequest);
249 }
250 }
251
252 protected void coalesceDataRequest(ITmfDataRequest<T> request) {
253 synchronized (fLock) {
254 for (TmfCoalescedDataRequest<T> coalescedRequest : fPendingCoalescedRequests) {
255 if (coalescedRequest.isCompatible(request)) {
256 coalescedRequest.addRequest(request);
257 if (Tracer.isRequestTraced()) {
90891c08
FC
258 Tracer.traceRequest(request, "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
259 Tracer.traceRequest(coalescedRequest, "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$
948b0607
FC
260 }
261 return;
262 }
263 }
264 newCoalescedDataRequest(request);
265 }
266 }
267
268 // ------------------------------------------------------------------------
269 // Request processing
270 // ------------------------------------------------------------------------
271
272 private void dispatchRequest(final ITmfDataRequest<T> request) {
0283f7ff 273 if (request.getExecType() == ExecutionType.FOREGROUND) {
948b0607 274 queueRequest(request);
0283f7ff 275 } else {
948b0607 276 queueBackgroundRequest(request, request.getBlockSize(), true);
0283f7ff 277 }
948b0607
FC
278 }
279
280 protected void queueRequest(final ITmfDataRequest<T> request) {
281
282 if (fExecutor.isShutdown()) {
283 request.cancel();
284 return;
285 }
286
287 final TmfDataProvider<T> provider = this;
288
289 // Process the request
290 TmfThread thread = new TmfThread(request.getExecType()) {
0283f7ff 291
948b0607
FC
292 @Override
293 public void run() {
294
4cf201de 295 if (Tracer.isRequestTraced()) {
90891c08 296 Tracer.traceRequest(request, "is being serviced by " + provider.getName()); //$NON-NLS-1$
4cf201de 297 }
948b0607
FC
298
299 // Extract the generic information
300 request.start();
301 int nbRequested = request.getNbRequested();
302 int nbRead = 0;
303
304 // Initialize the execution
305 ITmfContext context = armRequest(request);
306 if (context == null) {
307 request.cancel();
308 return;
309 }
310
311 try {
312 // Get the ordered events
313 T data = getNext(context);
314 if (Tracer.isRequestTraced())
0283f7ff 315 {
90891c08 316 Tracer.traceRequest(request, "read first event"); //$NON-NLS-1$
0283f7ff 317 }
948b0607 318 while (data != null && !isCompleted(request, data, nbRead)) {
0283f7ff 319 if (fLogData) {
408e65d2
FC
320 Tracer.traceEvent(provider, request, data);
321 }
1b70b6dc
PT
322 if (request.getDataType().isInstance(data)) {
323 request.handleData(data);
324 }
948b0607
FC
325
326 // To avoid an unnecessary read passed the last data
327 // requested
328 if (++nbRead < nbRequested) {
329 data = getNext(context);
330 }
331 }
332 if (Tracer.isRequestTraced())
0283f7ff 333 {
90891c08 334 Tracer.traceRequest(request, "COMPLETED"); //$NON-NLS-1$
0283f7ff 335 }
948b0607
FC
336
337 if (request.isCancelled()) {
338 request.cancel();
339 } else {
340 request.done();
341 }
342 } catch (Exception e) {
343 request.fail();
344 }
345
346 // Cleanup
347 context.dispose();
348 }
475743b7
FC
349
350 @Override
351 public void cancel() {
8a0edc79
FC
352 if (!request.isCompleted()) {
353 request.cancel();
475743b7
FC
354 }
355 }
948b0607
FC
356 };
357
358 if (Tracer.isRequestTraced())
0283f7ff 359 {
90891c08 360 Tracer.traceRequest(request, "QUEUED"); //$NON-NLS-1$
0283f7ff 361 }
948b0607
FC
362 fExecutor.execute(thread);
363
364 }
365
366 protected void queueBackgroundRequest(final ITmfDataRequest<T> request, final int blockSize, final boolean indexing) {
367
4cf201de
FC
368 final TmfDataProvider<T> provider = this;
369
948b0607
FC
370 Thread thread = new Thread() {
371 @Override
372 public void run() {
4cf201de
FC
373
374 if (Tracer.isRequestTraced()) {
375 Tracer.traceRequest(request, "is being serviced by " + provider.getName()); //$NON-NLS-1$
376 }
377
948b0607
FC
378 request.start();
379
380 final Integer[] CHUNK_SIZE = new Integer[1];
381 CHUNK_SIZE[0] = Math.min(request.getNbRequested(), blockSize + ((indexing) ? 1 : 0));
382
383 final Integer[] nbRead = new Integer[1];
384 nbRead[0] = 0;
385
386 final Boolean[] isFinished = new Boolean[1];
387 isFinished[0] = Boolean.FALSE;
388
389 while (!isFinished[0]) {
390
12c155f5
FC
391 TmfDataRequest<T> subRequest = new TmfDataRequest<T>(request.getDataType(), request.getIndex()
392 + nbRead[0], CHUNK_SIZE[0], blockSize, ExecutionType.BACKGROUND) {
11a2fdf0
PT
393
394 @Override
395 public synchronized boolean isCompleted() {
396 return super.isCompleted() || request.isCompleted();
397 }
398
948b0607
FC
399 @Override
400 public void handleData(T data) {
401 super.handleData(data);
1b70b6dc
PT
402 if (request.getDataType().isInstance(data)) {
403 request.handleData(data);
404 }
948b0607
FC
405 if (getNbRead() > CHUNK_SIZE[0]) {
406 System.out.println("ERROR - Read too many events"); //$NON-NLS-1$
407 }
408 }
409
410 @Override
411 public void handleCompleted() {
412 nbRead[0] += getNbRead();
413 if (nbRead[0] >= request.getNbRequested() || (getNbRead() < CHUNK_SIZE[0])) {
90de83da 414 if (this.isCancelled()) {
948b0607 415 request.cancel();
90de83da 416 } else if (this.isFailed()) {
12c155f5 417 request.fail();
948b0607
FC
418 } else {
419 request.done();
420 }
421 isFinished[0] = Boolean.TRUE;
422 }
423 super.handleCompleted();
424 }
425 };
426
427 if (!isFinished[0]) {
428 queueRequest(subRequest);
429
430 try {
431 subRequest.waitForCompletion();
11a2fdf0
PT
432 if (request.isCompleted()) {
433 isFinished[0] = Boolean.TRUE;
434 }
948b0607
FC
435 } catch (InterruptedException e) {
436 e.printStackTrace();
437 }
438
439 CHUNK_SIZE[0] = Math.min(request.getNbRequested() - nbRead[0], blockSize);
440 }
441 }
442 }
443 };
444
445 thread.start();
446 }
447
448 /**
d337369a
FC
449 * Initialize the provider based on the request. The context is provider
450 * specific and will be updated by getNext().
0283f7ff 451 *
948b0607 452 * @param request
12c155f5 453 * @return an application specific context; null if request can't be serviced
948b0607 454 */
9e0640dc 455 protected abstract ITmfContext armRequest(ITmfDataRequest<T> request);
948b0607 456
c32744d6
FC
457// /**
458// * Return the next event based on the context supplied. The context
459// * will be updated for the subsequent read.
0283f7ff 460// *
c32744d6
FC
461// * @param context the trace read context (updated)
462// * @return the event referred to by context
463// */
464// public abstract T getNext(ITmfContext context);
948b0607
FC
465
466 /**
467 * Checks if the data meets the request completion criteria.
0283f7ff 468 *
0d9a6d76
FC
469 * @param request the request
470 * @param data the data to verify
471 * @param nbRead the number of events read so far
472 * @return true if completion criteria is met
948b0607
FC
473 */
474 public boolean isCompleted(ITmfDataRequest<T> request, T data, int nbRead) {
b6be1c3e 475 return request.isCompleted() || nbRead >= request.getNbRequested();
948b0607
FC
476 }
477
478 // ------------------------------------------------------------------------
479 // Signal handlers
480 // ------------------------------------------------------------------------
481
063f0d27
AM
482 /**
483 * Handler for the start synch signal
484 *
485 * @param signal
486 * Incoming signal
487 */
948b0607
FC
488 @TmfSignalHandler
489 public void startSynch(TmfStartSynchSignal signal) {
490 synchronized (fLock) {
491 fSignalDepth++;
492 }
493 }
494
063f0d27
AM
495 /**
496 * Handler for the end synch signal
497 *
498 * @param signal
499 * Incoming signal
500 */
948b0607
FC
501 @TmfSignalHandler
502 public void endSynch(TmfEndSynchSignal signal) {
045df77d 503 synchronized (fLock) {
948b0607
FC
504 fSignalDepth--;
505 if (fSignalDepth == 0) {
506 fireRequest();
507 }
045df77d 508 }
948b0607 509 }
8c8bf09f
ASL
510
511}
This page took 0.070218 seconds and 5 git commands to generate.