1 /*******************************************************************************
2 * Copyright (c) 2009, 2014 Ericsson
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * Francois Chouinard - Initial API and implementation, replace background
11 * requests by preemptable requests
12 * Alexandre Montplaisir - Merge with TmfDataProvider
13 * Bernd Hufmann - Add timer based coalescing for background requests
14 *******************************************************************************/
16 package org
.eclipse
.tracecompass
.tmf
.core
.component
;
18 import java
.util
.Iterator
;
19 import java
.util
.LinkedList
;
20 import java
.util
.List
;
21 import java
.util
.Timer
;
22 import java
.util
.TimerTask
;
24 import org
.eclipse
.tracecompass
.internal
.tmf
.core
.TmfCoreTracer
;
25 import org
.eclipse
.tracecompass
.internal
.tmf
.core
.component
.TmfEventThread
;
26 import org
.eclipse
.tracecompass
.internal
.tmf
.core
.component
.TmfProviderManager
;
27 import org
.eclipse
.tracecompass
.internal
.tmf
.core
.request
.TmfCoalescedEventRequest
;
28 import org
.eclipse
.tracecompass
.internal
.tmf
.core
.request
.TmfRequestExecutor
;
29 import org
.eclipse
.tracecompass
.tmf
.core
.event
.ITmfEvent
;
30 import org
.eclipse
.tracecompass
.tmf
.core
.request
.ITmfEventRequest
;
31 import org
.eclipse
.tracecompass
.tmf
.core
.request
.ITmfEventRequest
.ExecutionType
;
32 import org
.eclipse
.tracecompass
.tmf
.core
.signal
.TmfEndSynchSignal
;
33 import org
.eclipse
.tracecompass
.tmf
.core
.signal
.TmfSignalHandler
;
34 import org
.eclipse
.tracecompass
.tmf
.core
.signal
.TmfStartSynchSignal
;
35 import org
.eclipse
.tracecompass
.tmf
.core
.timestamp
.ITmfTimestamp
;
36 import org
.eclipse
.tracecompass
.tmf
.core
.trace
.ITmfContext
;
39 * An abstract base class that implements ITmfEventProvider.
41 * This abstract class implements the housekeeping methods to register/
42 * de-register the event provider and to handle generically the event requests.
45 * @author Francois Chouinard
48 public abstract class TmfEventProvider
extends TmfComponent
implements ITmfEventProvider
{
50 // ------------------------------------------------------------------------
52 // ------------------------------------------------------------------------
54 /** Default amount of events per request "chunk"
56 public static final int DEFAULT_BLOCK_SIZE
= 50000;
58 /** Delay for coalescing background requests (in milli-seconds) */
59 private static final long DELAY
= 1000;
61 // ------------------------------------------------------------------------
63 // ------------------------------------------------------------------------
65 /** List of coalesced requests */
66 private final List
<TmfCoalescedEventRequest
> fPendingCoalescedRequests
= new LinkedList
<>();
68 /** The type of event handled by this provider */
69 private Class
<?
extends ITmfEvent
> fType
;
71 private final TmfRequestExecutor fExecutor
;
73 private final Object fLock
= new Object();
75 private int fSignalDepth
= 0;
77 private int fRequestPendingCounter
= 0;
81 private boolean fIsTimeout
= false;
83 // ------------------------------------------------------------------------
85 // ------------------------------------------------------------------------
90 public TmfEventProvider() {
92 fExecutor
= new TmfRequestExecutor();
96 * Standard constructor. Instantiate and initialize at the same time.
99 * Name of the provider
101 * The type of events that will be handled
103 public TmfEventProvider(String name
, Class
<?
extends ITmfEvent
> type
) {
109 * Initialize this data provider
112 * Name of the provider
114 * The type of events that will be handled
116 public void init(String name
, Class
<?
extends ITmfEvent
> type
) {
123 synchronized (fLock
) {
124 fTimer
= new Timer();
127 TmfProviderManager
.register(fType
, this);
131 public void dispose() {
132 TmfProviderManager
.deregister(fType
, this);
134 synchronized (fLock
) {
135 if (fTimer
!= null) {
143 // ------------------------------------------------------------------------
145 // ------------------------------------------------------------------------
148 * Get the event type this provider handles
150 * @return The type of ITmfEvent
152 public Class
<?
extends ITmfEvent
> getType() {
156 // ------------------------------------------------------------------------
157 // ITmfRequestHandler
158 // ------------------------------------------------------------------------
164 public void sendRequest(final ITmfEventRequest request
) {
165 synchronized (fLock
) {
167 if (TmfCoreTracer
.isRequestTraced()) {
168 TmfCoreTracer
.traceRequest(request
.getRequestId(), "SENT to provider " + getName()); //$NON-NLS-1$
171 if (request
.getExecType() == ExecutionType
.FOREGROUND
) {
172 if ((fSignalDepth
> 0) || (fRequestPendingCounter
> 0)) {
173 coalesceEventRequest(request
);
175 queueRequest(request
);
181 * Dispatch request in case timer is not running.
183 if (fTimer
== null) {
184 queueRequest(request
);
189 * For the first background request in the request pending queue
190 * a timer will be started to allow other background requests to
193 boolean startTimer
= (getNbPendingBackgroundRequests() == 0);
194 coalesceEventRequest(request
);
196 TimerTask task
= new TimerTask() {
199 synchronized (fLock
) {
205 fTimer
.schedule(task
, DELAY
);
210 private void fireRequest() {
211 synchronized (fLock
) {
212 if (fRequestPendingCounter
> 0) {
216 if (fPendingCoalescedRequests
.size() > 0) {
217 Iterator
<TmfCoalescedEventRequest
> iter
= fPendingCoalescedRequests
.iterator();
218 while (iter
.hasNext()) {
219 ExecutionType type
= (fIsTimeout ? ExecutionType
.BACKGROUND
: ExecutionType
.FOREGROUND
);
220 ITmfEventRequest request
= iter
.next();
221 if (type
== request
.getExecType()) {
222 queueRequest(request
);
231 * Increments/decrements the pending requests counters and fires the request
232 * if necessary (counter == 0). Used for coalescing requests across multiple
236 * Should we increment (true) or decrement (false) the pending
240 public void notifyPendingRequest(boolean isIncrement
) {
241 synchronized (fLock
) {
243 fRequestPendingCounter
++;
245 if (fRequestPendingCounter
> 0) {
246 fRequestPendingCounter
--;
249 // fire request if all pending requests are received
250 if (fRequestPendingCounter
== 0) {
257 // ------------------------------------------------------------------------
259 // ------------------------------------------------------------------------
262 * Create a new request from an existing one, and add it to the coalesced
266 * The request to copy
269 protected void newCoalescedEventRequest(ITmfEventRequest request
) {
270 synchronized (fLock
) {
271 TmfCoalescedEventRequest coalescedRequest
= new TmfCoalescedEventRequest(
272 request
.getDataType(),
275 request
.getNbRequested(),
276 request
.getExecType());
277 coalescedRequest
.addRequest(request
);
278 if (TmfCoreTracer
.isRequestTraced()) {
279 TmfCoreTracer
.traceRequest(request
.getRequestId(), "COALESCED with " + coalescedRequest
.getRequestId()); //$NON-NLS-1$
280 TmfCoreTracer
.traceRequest(coalescedRequest
.getRequestId(), "now contains " + coalescedRequest
.getSubRequestIds()); //$NON-NLS-1$
282 fPendingCoalescedRequests
.add(coalescedRequest
);
287 * Add an existing requests to the list of coalesced ones
290 * The request to add to the list
293 protected void coalesceEventRequest(ITmfEventRequest request
) {
294 synchronized (fLock
) {
295 for (TmfCoalescedEventRequest coalescedRequest
: fPendingCoalescedRequests
) {
296 if (coalescedRequest
.isCompatible(request
)) {
297 coalescedRequest
.addRequest(request
);
298 if (TmfCoreTracer
.isRequestTraced()) {
299 TmfCoreTracer
.traceRequest(request
.getRequestId(), "COALESCED with " + coalescedRequest
.getRequestId()); //$NON-NLS-1$
300 TmfCoreTracer
.traceRequest(coalescedRequest
.getRequestId(), "now contains " + coalescedRequest
.getSubRequestIds()); //$NON-NLS-1$
305 newCoalescedEventRequest(request
);
310 * Gets the number of background requests in pending queue.
312 * @return the number of background requests in pending queue
314 private int getNbPendingBackgroundRequests() {
315 int nbBackgroundRequests
= 0;
316 synchronized (fLock
) {
317 for (ITmfEventRequest request
: fPendingCoalescedRequests
) {
318 if (request
.getExecType() == ExecutionType
.BACKGROUND
) {
319 nbBackgroundRequests
++;
323 return nbBackgroundRequests
;
326 // ------------------------------------------------------------------------
327 // Request processing
328 // ------------------------------------------------------------------------
337 protected void queueRequest(final ITmfEventRequest request
) {
339 if (fExecutor
.isShutdown()) {
344 TmfEventThread thread
= new TmfEventThread(this, request
);
346 if (TmfCoreTracer
.isRequestTraced()) {
347 TmfCoreTracer
.traceRequest(request
.getRequestId(), "QUEUED"); //$NON-NLS-1$
350 fExecutor
.execute(thread
);
354 * Initialize the provider based on the request. The context is provider
355 * specific and will be updated by getNext().
359 * @return An application specific context; null if request can't be
363 public abstract ITmfContext
armRequest(ITmfEventRequest request
);
366 * Checks if the data meets the request completion criteria.
373 * The number of events read so far
374 * @return true if completion criteria is met
377 public boolean isCompleted(ITmfEventRequest request
, ITmfEvent event
, int nbRead
) {
378 boolean requestCompleted
= isCompleted2(request
, nbRead
);
379 if (!requestCompleted
) {
380 ITmfTimestamp endTime
= request
.getRange().getEndTime();
381 return event
.getTimestamp().compareTo(endTime
) > 0;
383 return requestCompleted
;
386 private static boolean isCompleted2(ITmfEventRequest request
,int nbRead
) {
387 return request
.isCompleted() || nbRead
>= request
.getNbRequested();
390 // ------------------------------------------------------------------------
391 // Pass-through's to the request executor
392 // ------------------------------------------------------------------------
395 * @return the shutdown state (i.e. if it is accepting new requests)
398 protected boolean executorIsShutdown() {
399 return fExecutor
.isShutdown();
403 * @return the termination state
406 protected boolean executorIsTerminated() {
407 return fExecutor
.isTerminated();
410 // ------------------------------------------------------------------------
412 // ------------------------------------------------------------------------
415 * Handler for the start synch signal
421 public void startSynch(TmfStartSynchSignal signal
) {
422 synchronized (fLock
) {
428 * Handler for the end synch signal
434 public void endSynch(TmfEndSynchSignal signal
) {
435 synchronized (fLock
) {
437 if (fSignalDepth
== 0) {