1 /*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
13 package org
.eclipse
.linuxtools
.tmf
.component
;
15 import java
.util
.Vector
;
16 import java
.util
.concurrent
.BlockingQueue
;
17 import java
.util
.concurrent
.LinkedBlockingQueue
;
18 import java
.util
.concurrent
.SynchronousQueue
;
20 import org
.eclipse
.linuxtools
.tmf
.Tracer
;
21 import org
.eclipse
.linuxtools
.tmf
.event
.TmfData
;
22 import org
.eclipse
.linuxtools
.tmf
.request
.ITmfDataRequest
;
23 import org
.eclipse
.linuxtools
.tmf
.request
.ITmfDataRequest
.ExecutionType
;
24 import org
.eclipse
.linuxtools
.tmf
.request
.TmfCoalescedDataRequest
;
25 import org
.eclipse
.linuxtools
.tmf
.request
.TmfDataRequest
;
26 import org
.eclipse
.linuxtools
.tmf
.request
.TmfRequestExecutor
;
27 import org
.eclipse
.linuxtools
.tmf
.signal
.TmfEndSynchSignal
;
28 import org
.eclipse
.linuxtools
.tmf
.signal
.TmfSignalHandler
;
29 import org
.eclipse
.linuxtools
.tmf
.signal
.TmfStartSynchSignal
;
30 import org
.eclipse
.linuxtools
.tmf
.trace
.ITmfContext
;
33 * <b><u>TmfProvider</u></b>
35 * The TmfProvider<T> is a provider for a data of type <T>.
37 * This abstract class implements the housekeeking methods to register/
38 * deregister the event provider and to handle generically the event requests.
40 * The concrete class can either re-implement processRequest() entirely or
41 * just implement the hooks (initializeContext() and getNext()).
43 * TODO: Add support for providing multiple data types.
45 public abstract class TmfDataProvider
<T
extends TmfData
> extends TmfComponent
implements ITmfDataProvider
<T
> {
47 // ------------------------------------------------------------------------
49 // ------------------------------------------------------------------------
51 // private static final ITmfDataRequest.ExecutionType SHORT = ITmfDataRequest.ExecutionType.SHORT;
52 // private static final ITmfDataRequest.ExecutionType LONG = ITmfDataRequest.ExecutionType.LONG;
54 // ------------------------------------------------------------------------
56 // ------------------------------------------------------------------------
58 final protected Class
<T
> fType
;
59 final protected boolean fLogData
;
60 final protected boolean fLogError
;
62 public static final int DEFAULT_BLOCK_SIZE
= 50000;
63 public static final int DEFAULT_QUEUE_SIZE
= 1000;
65 protected final int fQueueSize
;
66 protected final BlockingQueue
<T
> fDataQueue
;
67 protected final TmfRequestExecutor fExecutor
;
69 private int fSignalDepth
= 0;
70 private final Object fLock
= new Object();
72 private int fRequestPendingCounter
= 0;
74 // ------------------------------------------------------------------------
76 // ------------------------------------------------------------------------
78 public TmfDataProvider(String name
, Class
<T
> type
) {
79 this(name
, type
, DEFAULT_QUEUE_SIZE
);
82 protected TmfDataProvider(String name
, Class
<T
> type
, int queueSize
) {
85 fQueueSize
= queueSize
;
86 fDataQueue
= (fQueueSize
> 1) ?
new LinkedBlockingQueue
<T
>(fQueueSize
) : new SynchronousQueue
<T
>();
88 fExecutor
= new TmfRequestExecutor();
91 fLogData
= Tracer
.isEventTraced();
92 fLogError
= Tracer
.isErrorTraced();
94 TmfProviderManager
.register(fType
, this);
95 // if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "started");
98 public TmfDataProvider(TmfDataProvider
<T
> other
) {
101 fQueueSize
= other
.fQueueSize
;
102 fDataQueue
= (fQueueSize
> 1) ?
new LinkedBlockingQueue
<T
>(fQueueSize
) : new SynchronousQueue
<T
>();
104 fExecutor
= new TmfRequestExecutor();
107 fLogData
= Tracer
.isEventTraced();
108 fLogError
= Tracer
.isErrorTraced();
112 public void dispose() {
113 TmfProviderManager
.deregister(fType
, this);
116 // if (Tracer.isComponentTraced()) Tracer.traceComponent(this, "stopped");
119 public int getQueueSize() {
123 public Class
<?
> getType() {
127 // ------------------------------------------------------------------------
128 // ITmfRequestHandler
129 // ------------------------------------------------------------------------
132 public void sendRequest(final ITmfDataRequest
<T
> request
) {
133 synchronized(fLock
) {
134 if (fSignalDepth
> 0) {
135 coalesceDataRequest(request
);
137 dispatchRequest(request
);
143 * This method queues the coalesced requests.
148 public void fireRequest() {
149 synchronized(fLock
) {
150 if (fRequestPendingCounter
> 0) {
153 if (fPendingCoalescedRequests
.size() > 0) {
154 for (TmfDataRequest
<T
> request
: fPendingCoalescedRequests
) {
155 dispatchRequest(request
);
157 fPendingCoalescedRequests
.clear();
163 * Increments/decrements the pending requests counters and fires
164 * the request if necessary (counter == 0). Used for coalescing
165 * requests accross multiple TmfDataProvider.
170 public void notifyPendingRequest(boolean isIncrement
) {
171 synchronized(fLock
) {
173 if (fSignalDepth
> 0) {
174 fRequestPendingCounter
++;
177 if (fRequestPendingCounter
> 0) {
178 fRequestPendingCounter
--;
181 // fire request if all pending requests are received
182 if (fRequestPendingCounter
== 0) {
189 // ------------------------------------------------------------------------
190 // Coalescing (primitive test...)
191 // ------------------------------------------------------------------------
193 protected Vector
<TmfCoalescedDataRequest
<T
>> fPendingCoalescedRequests
= new Vector
<TmfCoalescedDataRequest
<T
>>();
195 protected void newCoalescedDataRequest(ITmfDataRequest
<T
> request
) {
196 synchronized(fLock
) {
197 TmfCoalescedDataRequest
<T
> coalescedRequest
= new TmfCoalescedDataRequest
<T
>(
198 fType
, request
.getIndex(), request
.getNbRequested(),request
.getExecType());
199 coalescedRequest
.addRequest(request
);
200 if (Tracer
.isRequestTraced()) {
201 Tracer
.traceRequest(request
, "coalesced with " + coalescedRequest
.getRequestId()); //$NON-NLS-1$
202 Tracer
.traceRequest(coalescedRequest
, "added " + request
.getRequestId()); //$NON-NLS-1$
204 fPendingCoalescedRequests
.add(coalescedRequest
);
208 protected void coalesceDataRequest(ITmfDataRequest
<T
> request
) {
209 synchronized(fLock
) {
210 for (TmfCoalescedDataRequest
<T
> req
: fPendingCoalescedRequests
) {
211 if (req
.isCompatible(request
)) {
212 req
.addRequest(request
);
213 if (Tracer
.isRequestTraced()) {
214 Tracer
.traceRequest(request
, "coalesced with " + req
.getRequestId()); //$NON-NLS-1$
215 Tracer
.traceRequest(req
, "added " + request
.getRequestId()); //$NON-NLS-1$
220 newCoalescedDataRequest(request
);
224 // ------------------------------------------------------------------------
225 // Request processing
226 // ------------------------------------------------------------------------
228 private void dispatchRequest(final ITmfDataRequest
<T
> request
) {
229 if (request
.getExecType() == ExecutionType
.FOREGROUND
)
230 queueRequest(request
);
232 queueBackgroundRequest(request
, DEFAULT_BLOCK_SIZE
, true);
235 protected void queueRequest(final ITmfDataRequest
<T
> request
) {
237 if (fExecutor
.isShutdown()) {
242 final TmfDataProvider
<T
> provider
= this;
244 // Process the request
245 TmfThread thread
= new TmfThread(request
.getExecType()) {
250 if (Tracer
.isRequestTraced()) Tracer
.traceRequest(request
, "started"); //$NON-NLS-1$
252 // Extract the generic information
254 int nbRequested
= request
.getNbRequested();
257 // Initialize the execution
258 ITmfContext context
= armRequest(request
);
259 if (context
== null) {
265 // Get the ordered events
266 if (Tracer
.isRequestTraced()) Tracer
.trace("Request #" + request
.getRequestId() + " is being serviced by " + provider
.getName()); //$NON-NLS-1$//$NON-NLS-2$
267 T data
= getNext(context
);
268 if (Tracer
.isRequestTraced()) Tracer
.trace("Request #" + request
.getRequestId() + " read first event"); //$NON-NLS-1$ //$NON-NLS-2$
269 while (data
!= null && !isCompleted(request
, data
, nbRead
))
271 if (fLogData
) Tracer
.traceEvent(provider
, request
, data
);
272 request
.handleData(data
);
274 // To avoid an unnecessary read passed the last data requested
275 if (++nbRead
< nbRequested
) {
276 data
= getNext(context
);
277 if (Tracer
.isRequestTraced() && (data
== null || data
.isNullRef())) {
278 Tracer
.trace("Request #" + request
.getRequestId() + " end of data"); //$NON-NLS-1$//$NON-NLS-2$
283 if (request
.isCancelled()) {
290 if (Tracer
.isRequestTraced()) Tracer
.traceRequest(request
, "completed"); //$NON-NLS-1$
292 catch (Exception e
) {
293 if (Tracer
.isRequestTraced()) Tracer
.traceRequest(request
, "exception (failed)"); //$NON-NLS-1$
299 fExecutor
.execute(thread
);
301 if (Tracer
.isRequestTraced()) Tracer
.traceRequest(request
, "queued"); //$NON-NLS-1$
304 // By default, same behavior as a foreground request
305 protected void queueBackgroundRequest(final ITmfDataRequest
<T
> request
, final int blockSize
, boolean indexing
) {
306 queueRequest(request
);
310 * Initialize the provider based on the request. The context is
311 * provider specific and will be updated by getNext().
314 * @return an application specific context; null if request can't be serviced
316 public abstract ITmfContext
armRequest(ITmfDataRequest
<T
> request
);
317 public abstract T
getNext(ITmfContext context
);
320 * Checks if the data meets the request completion criteria.
326 public boolean isCompleted(ITmfDataRequest
<T
> request
, T data
, int nbRead
) {
327 return request
.isCompleted() || nbRead
>= request
.getNbRequested() || data
.isNullRef();
330 // ------------------------------------------------------------------------
332 // ------------------------------------------------------------------------
335 public void startSynch(TmfStartSynchSignal signal
) {
336 synchronized (fLock
) {
342 public void endSynch(TmfEndSynchSignal signal
) {
343 synchronized (fLock
) {
345 if (fSignalDepth
== 0) {