1 /*******************************************************************************
2 * Copyright (c) 2009, 2015 Ericsson
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * Francois Chouinard - Initial API and implementation, replace background
11 * requests by preemptable requests
12 * Alexandre Montplaisir - Merge with TmfDataProvider
13 * Bernd Hufmann - Add timer based coalescing for background requests
14 *******************************************************************************/
16 package org
.eclipse
.tracecompass
.tmf
.core
.component
;
18 import static org
.eclipse
.tracecompass
.common
.core
.NonNullUtils
.checkNotNull
;
20 import java
.util
.ArrayList
;
21 import java
.util
.Collections
;
22 import java
.util
.Iterator
;
23 import java
.util
.LinkedList
;
24 import java
.util
.List
;
25 import java
.util
.Timer
;
26 import java
.util
.TimerTask
;
28 import org
.eclipse
.jdt
.annotation
.NonNull
;
29 import org
.eclipse
.tracecompass
.common
.core
.NonNullUtils
;
30 import org
.eclipse
.tracecompass
.internal
.tmf
.core
.TmfCoreTracer
;
31 import org
.eclipse
.tracecompass
.internal
.tmf
.core
.component
.TmfEventThread
;
32 import org
.eclipse
.tracecompass
.internal
.tmf
.core
.component
.TmfProviderManager
;
33 import org
.eclipse
.tracecompass
.internal
.tmf
.core
.request
.TmfCoalescedEventRequest
;
34 import org
.eclipse
.tracecompass
.internal
.tmf
.core
.request
.TmfRequestExecutor
;
35 import org
.eclipse
.tracecompass
.tmf
.core
.event
.ITmfEvent
;
36 import org
.eclipse
.tracecompass
.tmf
.core
.filter
.ITmfFilter
;
37 import org
.eclipse
.tracecompass
.tmf
.core
.request
.ITmfEventRequest
;
38 import org
.eclipse
.tracecompass
.tmf
.core
.request
.ITmfEventRequest
.ExecutionType
;
39 import org
.eclipse
.tracecompass
.tmf
.core
.signal
.TmfEndSynchSignal
;
40 import org
.eclipse
.tracecompass
.tmf
.core
.signal
.TmfSignalHandler
;
41 import org
.eclipse
.tracecompass
.tmf
.core
.signal
.TmfStartSynchSignal
;
42 import org
.eclipse
.tracecompass
.tmf
.core
.timestamp
.ITmfTimestamp
;
43 import org
.eclipse
.tracecompass
.tmf
.core
.trace
.ITmfContext
;
46 * An abstract base class that implements ITmfEventProvider.
48 * This abstract class implements the housekeeping methods to register/
49 * de-register the event provider and to handle generically the event requests.
52 * @author Francois Chouinard
54 public abstract class TmfEventProvider
extends TmfComponent
implements ITmfEventProvider
, ITmfFilter
{
56 // ------------------------------------------------------------------------
58 // ------------------------------------------------------------------------
60 /** Default amount of events per request "chunk" */
61 public static final int DEFAULT_BLOCK_SIZE
= 50000;
63 /** Delay for coalescing background requests (in milli-seconds) */
64 private static final long DELAY
= 1000;
66 // ------------------------------------------------------------------------
68 // ------------------------------------------------------------------------
70 /** List of coalesced requests */
71 private final List
<TmfCoalescedEventRequest
> fPendingCoalescedRequests
= new LinkedList
<>();
73 /** The type of event handled by this provider */
74 private Class
<?
extends ITmfEvent
> fType
;
76 private final TmfRequestExecutor fExecutor
;
78 private final Object fLock
= new Object();
80 private int fSignalDepth
= 0;
82 private int fRequestPendingCounter
= 0;
86 /** Current timer task */
87 @NonNull private TimerTask fCurrentTask
= new TimerTask() { @Override public void run() {} };
89 private boolean fIsTimerEnabled
;
92 * The parent event provider.
94 private TmfEventProvider fParent
= null;
96 * The list if children event provider.
98 private final List
<TmfEventProvider
> fChildren
= Collections
.synchronizedList(new ArrayList
<TmfEventProvider
>());
100 // ------------------------------------------------------------------------
102 // ------------------------------------------------------------------------
105 * Default constructor
107 public TmfEventProvider() {
109 setTimerEnabled(true);
110 fExecutor
= new TmfRequestExecutor();
114 * Standard constructor. Instantiate and initialize at the same time.
117 * Name of the provider
119 * The type of events that will be handled
121 public TmfEventProvider(String name
, Class
<?
extends ITmfEvent
> type
) {
127 * Initialize this data provider
130 * Name of the provider
132 * The type of events that will be handled
134 public void init(String name
, Class
<?
extends ITmfEvent
> type
) {
141 synchronized (fLock
) {
142 fTimer
= new Timer();
145 TmfProviderManager
.register(fType
, this);
149 public void dispose() {
150 TmfProviderManager
.deregister(fType
, this);
152 synchronized (fLock
) {
153 if (fTimer
!= null) {
159 synchronized (fChildren
) {
160 for (TmfEventProvider child
: fChildren
) {
165 clearPendingRequests();
169 // ------------------------------------------------------------------------
171 // ------------------------------------------------------------------------
177 public Class
<?
extends ITmfEvent
> getEventType() {
181 // ------------------------------------------------------------------------
182 // ITmfRequestHandler
183 // ------------------------------------------------------------------------
186 public void sendRequest(final ITmfEventRequest request
) {
187 synchronized (fLock
) {
189 if (TmfCoreTracer
.isRequestTraced()) {
190 TmfCoreTracer
.traceRequest(request
.getRequestId(), "SENT to provider " + getName()); //$NON-NLS-1$
193 if (request
.getProviderFilter() == null) {
194 request
.setProviderFilter(this);
197 if (sendWithParent(request
)) {
201 if (request
.getExecType() == ExecutionType
.FOREGROUND
) {
202 if ((fSignalDepth
> 0) || (fRequestPendingCounter
> 0)) {
203 coalesceEventRequest(request
);
205 queueRequest(request
);
211 * Dispatch request in case timer is not running.
213 if (fTimer
== null) {
214 queueRequest(request
);
218 coalesceEventRequest(request
);
220 if (fIsTimerEnabled
) {
221 fCurrentTask
.cancel();
222 fCurrentTask
= new TimerTask() {
225 synchronized (fLock
) {
230 fTimer
.schedule(fCurrentTask
, DELAY
);
235 private void fireRequest(boolean isTimeout
) {
236 synchronized (fLock
) {
237 if (fRequestPendingCounter
> 0) {
241 if (fPendingCoalescedRequests
.size() > 0) {
242 Iterator
<TmfCoalescedEventRequest
> iter
= fPendingCoalescedRequests
.iterator();
243 while (iter
.hasNext()) {
244 ExecutionType type
= (isTimeout ? ExecutionType
.BACKGROUND
: ExecutionType
.FOREGROUND
);
245 ITmfEventRequest request
= iter
.next();
246 if (type
== request
.getExecType()) {
247 queueRequest(request
);
256 * Increments/decrements the pending requests counters and fires the request
257 * if necessary (counter == 0). Used for coalescing requests across multiple
261 * Should we increment (true) or decrement (false) the pending
265 public void notifyPendingRequest(boolean isIncrement
) {
266 synchronized (fLock
) {
268 fRequestPendingCounter
++;
270 if (fRequestPendingCounter
> 0) {
271 fRequestPendingCounter
--;
274 // fire request if all pending requests are received
275 if (fRequestPendingCounter
== 0) {
283 // ------------------------------------------------------------------------
285 // ------------------------------------------------------------------------
288 * Create a new request from an existing one, and add it to the coalesced
292 * The request to copy
294 protected void newCoalescedEventRequest(ITmfEventRequest request
) {
295 synchronized (fLock
) {
296 TmfCoalescedEventRequest coalescedRequest
= new TmfCoalescedEventRequest(
297 request
.getDataType(),
300 request
.getNbRequested(),
301 request
.getExecType());
302 coalescedRequest
.addRequest(request
);
303 coalescedRequest
.setProviderFilter(this);
304 if (TmfCoreTracer
.isRequestTraced()) {
305 TmfCoreTracer
.traceRequest(request
.getRequestId(), "COALESCED with " + coalescedRequest
.getRequestId()); //$NON-NLS-1$
306 TmfCoreTracer
.traceRequest(coalescedRequest
.getRequestId(), "now contains " + coalescedRequest
.getSubRequestIds()); //$NON-NLS-1$
308 coalesceChildrenRequests(coalescedRequest
);
309 fPendingCoalescedRequests
.add(coalescedRequest
);
314 * Add an existing requests to the list of coalesced ones
317 * The request to add to the list
319 protected void coalesceEventRequest(ITmfEventRequest request
) {
320 synchronized (fLock
) {
321 for (TmfCoalescedEventRequest coalescedRequest
: getPendingRequests()) {
322 if (coalescedRequest
.isCompatible(request
)) {
323 coalescedRequest
.addRequest(request
);
324 if (TmfCoreTracer
.isRequestTraced()) {
325 TmfCoreTracer
.traceRequest(request
.getRequestId(), "COALESCED with " + coalescedRequest
.getRequestId()); //$NON-NLS-1$
326 TmfCoreTracer
.traceRequest(coalescedRequest
.getRequestId(), "now contains " + coalescedRequest
.getSubRequestIds()); //$NON-NLS-1$
328 coalesceChildrenRequests(coalescedRequest
);
332 newCoalescedEventRequest(request
);
337 * Sends a request with the parent if compatible.
339 private boolean sendWithParent(final ITmfEventRequest request
) {
340 ITmfEventProvider parent
= getParent();
341 if (parent
instanceof TmfEventProvider
) {
342 return ((TmfEventProvider
) parent
).sendIfCompatible(request
);
348 * Sends a request if compatible with a pending coalesced request.
350 private boolean sendIfCompatible(ITmfEventRequest request
) {
351 synchronized (fLock
) {
352 for (TmfCoalescedEventRequest coalescedRequest
: getPendingRequests()) {
353 if (coalescedRequest
.isCompatible(request
)) {
354 // Send so it can be coalesced with the parent(s)
355 sendRequest(request
);
360 return sendWithParent(request
);
364 * Coalesces children requests with given request if compatible.
366 private void coalesceChildrenRequests(final TmfCoalescedEventRequest request
) {
367 synchronized (fChildren
) {
368 for (TmfEventProvider child
: fChildren
) {
369 child
.coalesceCompatibleRequests(request
);
376 * Coalesces all pending requests that are compatible with coalesced request.
378 private void coalesceCompatibleRequests(TmfCoalescedEventRequest request
) {
379 Iterator
<TmfCoalescedEventRequest
> iter
= getPendingRequests().iterator();
380 while (iter
.hasNext()) {
381 TmfCoalescedEventRequest pendingRequest
= iter
.next();
382 if (request
.isCompatible(pendingRequest
)) {
383 request
.addRequest(pendingRequest
);
384 if (TmfCoreTracer
.isRequestTraced()) {
385 TmfCoreTracer
.traceRequest(pendingRequest
.getRequestId(), "COALESCED with " + request
.getRequestId()); //$NON-NLS-1$
386 TmfCoreTracer
.traceRequest(request
.getRequestId(), "now contains " + request
.getSubRequestIds()); //$NON-NLS-1$
393 // ------------------------------------------------------------------------
394 // Request processing
395 // ------------------------------------------------------------------------
403 protected void queueRequest(final ITmfEventRequest request
) {
405 if (fExecutor
.isShutdown()) {
410 TmfEventThread thread
= new TmfEventThread(this, request
);
412 if (TmfCoreTracer
.isRequestTraced()) {
413 TmfCoreTracer
.traceRequest(request
.getRequestId(), "QUEUED"); //$NON-NLS-1$
416 fExecutor
.execute(thread
);
420 * Initialize the provider based on the request. The context is provider
421 * specific and will be updated by getNext().
425 * @return An application specific context; null if request can't be
428 public abstract ITmfContext
armRequest(ITmfEventRequest request
);
431 * Checks if the data meets the request completion criteria.
438 * The number of events read so far
439 * @return true if completion criteria is met
441 public boolean isCompleted(ITmfEventRequest request
, ITmfEvent event
, int nbRead
) {
442 boolean requestCompleted
= isCompleted2(request
, nbRead
);
443 if (!requestCompleted
) {
444 ITmfTimestamp endTime
= request
.getRange().getEndTime();
445 return event
.getTimestamp().compareTo(endTime
) > 0;
447 return requestCompleted
;
450 private static boolean isCompleted2(ITmfEventRequest request
,int nbRead
) {
451 return request
.isCompleted() || nbRead
>= request
.getNbRequested();
454 // ------------------------------------------------------------------------
455 // Pass-through's to the request executor
456 // ------------------------------------------------------------------------
459 * @return the shutdown state (i.e. if it is accepting new requests)
461 protected boolean executorIsShutdown() {
462 return fExecutor
.isShutdown();
466 * @return the termination state
468 protected boolean executorIsTerminated() {
469 return fExecutor
.isTerminated();
472 // ------------------------------------------------------------------------
474 // ------------------------------------------------------------------------
477 * Handler for the start synch signal
483 public void startSynch(TmfStartSynchSignal signal
) {
484 synchronized (fLock
) {
490 * Handler for the end synch signal
496 public void endSynch(TmfEndSynchSignal signal
) {
497 synchronized (fLock
) {
499 if (fSignalDepth
== 0) {
506 public ITmfEventProvider
getParent() {
507 synchronized (fLock
) {
513 public void setParent(ITmfEventProvider parent
) {
514 if (!(parent
instanceof TmfEventProvider
)) {
515 throw new IllegalArgumentException();
518 synchronized (fLock
) {
519 fParent
= (TmfEventProvider
) parent
;
524 public List
<ITmfEventProvider
> getChildren() {
525 synchronized (fChildren
) {
526 List
<ITmfEventProvider
> list
= new ArrayList
<>();
527 list
.addAll(fChildren
);
533 public <T
extends ITmfEventProvider
> List
<T
> getChildren(Class
<T
> clazz
) {
534 List
<@NonNull T
> list
= new ArrayList
<>();
535 synchronized (fChildren
) {
536 for (TmfEventProvider child
: fChildren
) {
537 if (clazz
.isAssignableFrom(child
.getClass())) {
538 list
.add(checkNotNull(clazz
.cast(child
)));
546 public ITmfEventProvider
getChild(String name
) {
547 synchronized (fChildren
) {
548 for (TmfEventProvider child
: fChildren
) {
549 if (child
.getName().equals(name
)) {
558 public ITmfEventProvider
getChild(int index
) {
559 return NonNullUtils
.checkNotNull(fChildren
.get(index
));
563 public void addChild(ITmfEventProvider child
) {
564 if (!(child
instanceof TmfEventProvider
)) {
565 throw new IllegalArgumentException();
567 child
.setParent(this);
568 fChildren
.add((TmfEventProvider
)child
);
572 public int getNbChildren() {
573 return fChildren
.size();
577 * Returns true if an event was provided by this event provider or one of
578 * its children event providers else false.
582 * @return <code>true</code> if event was provided by this provider or one
583 * of its children else <code>false</code>
586 public boolean matches(ITmfEvent event
) {
587 if ((event
.getTrace() == this)) {
590 if (fChildren
.size() > 0) {
591 synchronized (fLock
) {
592 List
<TmfEventProvider
> children
= getChildren(TmfEventProvider
.class);
593 for (TmfEventProvider child
: children
) {
594 if (child
.matches(event
)) {
603 // ------------------------------------------------------------------------
604 // Debug code (will also used in tests using reflection)
605 // ------------------------------------------------------------------------
608 * Gets a list of all pending requests. Debug code.
610 * @return a list of all pending requests
612 private List
<TmfCoalescedEventRequest
> getPendingRequests() {
613 return fPendingCoalescedRequests
;
617 * Clears all pending requests. Debug code.
619 private void clearPendingRequests() {
620 fPendingCoalescedRequests
.clear();
624 * Enables/disables the timer. Debug code.
627 * the enable flag to set
629 private void setTimerEnabled(Boolean enabled
) {
630 fIsTimerEnabled
= enabled
;