1 /*******************************************************************************
2 * Copyright (c) 2009, 2014 Ericsson
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * Francois Chouinard - Initial API and implementation, replace background
11 * requests by preemptable requests
12 * Alexandre Montplaisir - Merge with TmfDataProvider
13 * Bernd Hufmann - Add timer based coalescing for background requests
14 *******************************************************************************/
16 package org
.eclipse
.linuxtools
.tmf
.core
.component
;
18 import java
.util
.Iterator
;
19 import java
.util
.LinkedList
;
20 import java
.util
.List
;
21 import java
.util
.Timer
;
22 import java
.util
.TimerTask
;
24 import org
.eclipse
.linuxtools
.internal
.tmf
.core
.TmfCoreTracer
;
25 import org
.eclipse
.linuxtools
.internal
.tmf
.core
.component
.TmfEventThread
;
26 import org
.eclipse
.linuxtools
.internal
.tmf
.core
.component
.TmfProviderManager
;
27 import org
.eclipse
.linuxtools
.internal
.tmf
.core
.request
.TmfCoalescedEventRequest
;
28 import org
.eclipse
.linuxtools
.internal
.tmf
.core
.request
.TmfRequestExecutor
;
29 import org
.eclipse
.linuxtools
.tmf
.core
.event
.ITmfEvent
;
30 import org
.eclipse
.linuxtools
.tmf
.core
.request
.ITmfEventRequest
;
31 import org
.eclipse
.linuxtools
.tmf
.core
.request
.ITmfEventRequest
.ExecutionType
;
32 import org
.eclipse
.linuxtools
.tmf
.core
.signal
.TmfEndSynchSignal
;
33 import org
.eclipse
.linuxtools
.tmf
.core
.signal
.TmfSignalHandler
;
34 import org
.eclipse
.linuxtools
.tmf
.core
.signal
.TmfStartSynchSignal
;
35 import org
.eclipse
.linuxtools
.tmf
.core
.timestamp
.ITmfTimestamp
;
36 import org
.eclipse
.linuxtools
.tmf
.core
.trace
.ITmfContext
;
39 * An abstract base class that implements ITmfEventProvider.
41 * This abstract class implements the housekeeping methods to register/
42 * de-register the event provider and to handle generically the event requests.
45 * @author Francois Chouinard
48 public abstract class TmfEventProvider
extends TmfComponent
implements ITmfEventProvider
{
50 // ------------------------------------------------------------------------
52 // ------------------------------------------------------------------------
54 /** Default amount of events per request "chunk"
56 public static final int DEFAULT_BLOCK_SIZE
= 50000;
58 /** Delay for coalescing background requests (in milli-seconds) */
59 private static final long DELAY
= 1000;
61 // ------------------------------------------------------------------------
63 // ------------------------------------------------------------------------
65 /** List of coalesced requests */
66 private final List
<TmfCoalescedEventRequest
> fPendingCoalescedRequests
= new LinkedList
<>();
68 /** The type of event handled by this provider */
69 private Class
<?
extends ITmfEvent
> fType
;
71 private final TmfRequestExecutor fExecutor
;
73 private final Object fLock
= new Object();
75 private int fSignalDepth
= 0;
77 private int fRequestPendingCounter
= 0;
81 private boolean fIsTimeout
= false;
83 // ------------------------------------------------------------------------
85 // ------------------------------------------------------------------------
90 public TmfEventProvider() {
92 fExecutor
= new TmfRequestExecutor();
96 * Standard constructor. Instantiate and initialize at the same time.
99 * Name of the provider
101 * The type of events that will be handled
103 public TmfEventProvider(String name
, Class
<?
extends ITmfEvent
> type
) {
109 * Initialize this data provider
112 * Name of the provider
114 * The type of events that will be handled
116 public void init(String name
, Class
<?
extends ITmfEvent
> type
) {
123 synchronized (fLock
) {
124 fTimer
= new Timer();
127 TmfProviderManager
.register(fType
, this);
131 public void dispose() {
132 TmfProviderManager
.deregister(fType
, this);
134 synchronized (fLock
) {
135 if (fTimer
!= null) {
143 // ------------------------------------------------------------------------
145 // ------------------------------------------------------------------------
148 * Get the event type this provider handles
150 * @return The type of ITmfEvent
152 public Class
<?
extends ITmfEvent
> getType() {
156 // ------------------------------------------------------------------------
157 // ITmfRequestHandler
158 // ------------------------------------------------------------------------
164 public void sendRequest(final ITmfEventRequest request
) {
165 synchronized (fLock
) {
166 if (request
.getExecType() == ExecutionType
.FOREGROUND
) {
167 if ((fSignalDepth
> 0) || (fRequestPendingCounter
> 0)) {
168 coalesceEventRequest(request
);
170 queueRequest(request
);
176 * Dispatch request in case timer is not running.
178 if (fTimer
== null) {
179 queueRequest(request
);
184 * For the first background request in the request pending queue
185 * a timer will be started to allow other background requests to
188 boolean startTimer
= (getNbPendingBackgroundRequests() == 0);
189 coalesceEventRequest(request
);
191 TimerTask task
= new TimerTask() {
194 synchronized (fLock
) {
200 fTimer
.schedule(task
, DELAY
);
205 private void fireRequest() {
206 synchronized (fLock
) {
207 if (fRequestPendingCounter
> 0) {
211 if (fPendingCoalescedRequests
.size() > 0) {
212 Iterator
<TmfCoalescedEventRequest
> iter
= fPendingCoalescedRequests
.iterator();
213 while (iter
.hasNext()) {
214 ExecutionType type
= (fIsTimeout ? ExecutionType
.BACKGROUND
: ExecutionType
.FOREGROUND
);
215 ITmfEventRequest request
= iter
.next();
216 if (type
== request
.getExecType()) {
217 queueRequest(request
);
226 * Increments/decrements the pending requests counters and fires the request
227 * if necessary (counter == 0). Used for coalescing requests across multiple
231 * Should we increment (true) or decrement (false) the pending
235 public void notifyPendingRequest(boolean isIncrement
) {
236 synchronized (fLock
) {
238 fRequestPendingCounter
++;
240 if (fRequestPendingCounter
> 0) {
241 fRequestPendingCounter
--;
244 // fire request if all pending requests are received
245 if (fRequestPendingCounter
== 0) {
252 // ------------------------------------------------------------------------
254 // ------------------------------------------------------------------------
257 * Create a new request from an existing one, and add it to the coalesced
261 * The request to copy
264 protected void newCoalescedEventRequest(ITmfEventRequest request
) {
265 synchronized (fLock
) {
266 TmfCoalescedEventRequest coalescedRequest
= new TmfCoalescedEventRequest(
267 request
.getDataType(),
270 request
.getNbRequested(),
271 request
.getExecType());
272 coalescedRequest
.addRequest(request
);
273 if (TmfCoreTracer
.isRequestTraced()) {
274 TmfCoreTracer
.traceRequest(request
, "COALESCED with " + coalescedRequest
.getRequestId()); //$NON-NLS-1$
275 TmfCoreTracer
.traceRequest(coalescedRequest
, "now contains " + coalescedRequest
.getSubRequestIds()); //$NON-NLS-1$
277 fPendingCoalescedRequests
.add(coalescedRequest
);
282 * Add an existing requests to the list of coalesced ones
285 * The request to add to the list
288 protected void coalesceEventRequest(ITmfEventRequest request
) {
289 synchronized (fLock
) {
290 for (TmfCoalescedEventRequest coalescedRequest
: fPendingCoalescedRequests
) {
291 if (coalescedRequest
.isCompatible(request
)) {
292 coalescedRequest
.addRequest(request
);
293 if (TmfCoreTracer
.isRequestTraced()) {
294 TmfCoreTracer
.traceRequest(request
, "COALESCED with " + coalescedRequest
.getRequestId()); //$NON-NLS-1$
295 TmfCoreTracer
.traceRequest(coalescedRequest
, "now contains " + coalescedRequest
.getSubRequestIds()); //$NON-NLS-1$
300 newCoalescedEventRequest(request
);
305 * Gets the number of background requests in pending queue.
307 * @return the number of background requests in pending queue
309 private int getNbPendingBackgroundRequests() {
310 int nbBackgroundRequests
= 0;
311 synchronized (fLock
) {
312 for (ITmfEventRequest request
: fPendingCoalescedRequests
) {
313 if (request
.getExecType() == ExecutionType
.BACKGROUND
) {
314 nbBackgroundRequests
++;
318 return nbBackgroundRequests
;
321 // ------------------------------------------------------------------------
322 // Request processing
323 // ------------------------------------------------------------------------
332 protected void queueRequest(final ITmfEventRequest request
) {
334 if (fExecutor
.isShutdown()) {
339 TmfEventThread thread
= new TmfEventThread(this, request
);
341 if (TmfCoreTracer
.isRequestTraced()) {
342 TmfCoreTracer
.traceRequest(request
, "QUEUED"); //$NON-NLS-1$
345 fExecutor
.execute(thread
);
349 * Initialize the provider based on the request. The context is provider
350 * specific and will be updated by getNext().
354 * @return An application specific context; null if request can't be
358 public abstract ITmfContext
armRequest(ITmfEventRequest request
);
361 * Checks if the data meets the request completion criteria.
368 * The number of events read so far
369 * @return true if completion criteria is met
372 public boolean isCompleted(ITmfEventRequest request
, ITmfEvent event
, int nbRead
) {
373 boolean requestCompleted
= isCompleted2(request
, nbRead
);
374 if (!requestCompleted
) {
375 ITmfTimestamp endTime
= request
.getRange().getEndTime();
376 return event
.getTimestamp().compareTo(endTime
, false) > 0;
378 return requestCompleted
;
381 private static boolean isCompleted2(ITmfEventRequest request
,int nbRead
) {
382 return request
.isCompleted() || nbRead
>= request
.getNbRequested();
385 // ------------------------------------------------------------------------
386 // Pass-through's to the request executor
387 // ------------------------------------------------------------------------
390 * @return the shutdown state (i.e. if it is accepting new requests)
393 protected boolean executorIsShutdown() {
394 return fExecutor
.isShutdown();
398 * @return the termination state
401 protected boolean executorIsTerminated() {
402 return fExecutor
.isTerminated();
405 // ------------------------------------------------------------------------
407 // ------------------------------------------------------------------------
410 * Handler for the start synch signal
416 public void startSynch(TmfStartSynchSignal signal
) {
417 synchronized (fLock
) {
423 * Handler for the end synch signal
429 public void endSynch(TmfEndSynchSignal signal
) {
430 synchronized (fLock
) {
432 if (fSignalDepth
== 0) {