/*******************************************************************************
- * Copyright (c) 2009, 2014 Ericsson
+ * Copyright (c) 2009, 2015 Ericsson
*
* All rights reserved. This program and the accompanying materials are
* made available under the terms of the Eclipse Public License v1.0 which
package org.eclipse.tracecompass.tmf.core.component;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.tracecompass.common.core.NonNullUtils;
import org.eclipse.tracecompass.internal.tmf.core.TmfCoreTracer;
import org.eclipse.tracecompass.internal.tmf.core.component.TmfEventThread;
import org.eclipse.tracecompass.internal.tmf.core.component.TmfProviderManager;
import org.eclipse.tracecompass.internal.tmf.core.request.TmfCoalescedEventRequest;
import org.eclipse.tracecompass.internal.tmf.core.request.TmfRequestExecutor;
import org.eclipse.tracecompass.tmf.core.event.ITmfEvent;
+import org.eclipse.tracecompass.tmf.core.filter.ITmfFilter;
import org.eclipse.tracecompass.tmf.core.request.ITmfEventRequest;
import org.eclipse.tracecompass.tmf.core.request.ITmfEventRequest.ExecutionType;
import org.eclipse.tracecompass.tmf.core.signal.TmfEndSynchSignal;
* </p>
*
* @author Francois Chouinard
- * @since 3.0
*/
-public abstract class TmfEventProvider extends TmfComponent implements ITmfEventProvider {
+public abstract class TmfEventProvider extends TmfComponent implements ITmfEventProvider, ITmfFilter {
// ------------------------------------------------------------------------
// Constants
// ------------------------------------------------------------------------
- /** Default amount of events per request "chunk"
- * @since 3.0 */
+ /** Default amount of events per request "chunk" */
public static final int DEFAULT_BLOCK_SIZE = 50000;
/** Delay for coalescing background requests (in milli-seconds) */
private Timer fTimer;
- private boolean fIsTimeout = false;
+ /** Current timer task */
+ @NonNull private TimerTask fCurrentTask = new TimerTask() { @Override public void run() {} };
+
+ private boolean fIsTimerEnabled;
+
+ /**
+ * The parent event provider.
+ */
+ private TmfEventProvider fParent = null;
+ /**
+ * The list if children event provider.
+ */
+ private final List<TmfEventProvider> fChildren = Collections.synchronizedList(new ArrayList<TmfEventProvider>());
// ------------------------------------------------------------------------
// Constructors
*/
public TmfEventProvider() {
super();
+ setTimerEnabled(true);
fExecutor = new TmfRequestExecutor();
}
}
fTimer = null;
}
+
+ synchronized (fChildren) {
+ for (TmfEventProvider child : fChildren) {
+ child.dispose();
+ }
+ fChildren.clear();
+ }
+ clearPendingRequests();
super.dispose();
}
// ITmfRequestHandler
// ------------------------------------------------------------------------
- /**
- * @since 3.0
- */
@Override
public void sendRequest(final ITmfEventRequest request) {
synchronized (fLock) {
+
+ if (TmfCoreTracer.isRequestTraced()) {
+ TmfCoreTracer.traceRequest(request.getRequestId(), "SENT to provider " + getName()); //$NON-NLS-1$
+ }
+
+ if (request.getProviderFilter() == null) {
+ request.setProviderFilter(this);
+ }
+
+ if (sendWithParent(request)) {
+ return;
+ }
+
if (request.getExecType() == ExecutionType.FOREGROUND) {
if ((fSignalDepth > 0) || (fRequestPendingCounter > 0)) {
coalesceEventRequest(request);
return;
}
- /*
- * For the first background request in the request pending queue
- * a timer will be started to allow other background requests to
- * coalesce.
- */
- boolean startTimer = (getNbPendingBackgroundRequests() == 0);
coalesceEventRequest(request);
- if (startTimer) {
- TimerTask task = new TimerTask() {
+
+ if (fIsTimerEnabled) {
+ fCurrentTask.cancel();
+ fCurrentTask = new TimerTask() {
@Override
public void run() {
synchronized (fLock) {
- fIsTimeout = true;
- fireRequest();
+ fireRequest(true);
}
}
};
- fTimer.schedule(task, DELAY);
+ fTimer.schedule(fCurrentTask, DELAY);
}
}
}
- private void fireRequest() {
+ private void fireRequest(boolean isTimeout) {
synchronized (fLock) {
if (fRequestPendingCounter > 0) {
return;
if (fPendingCoalescedRequests.size() > 0) {
Iterator<TmfCoalescedEventRequest> iter = fPendingCoalescedRequests.iterator();
while (iter.hasNext()) {
- ExecutionType type = (fIsTimeout ? ExecutionType.BACKGROUND : ExecutionType.FOREGROUND);
+ ExecutionType type = (isTimeout ? ExecutionType.BACKGROUND : ExecutionType.FOREGROUND);
ITmfEventRequest request = iter.next();
if (type == request.getExecType()) {
queueRequest(request);
// fire request if all pending requests are received
if (fRequestPendingCounter == 0) {
- fireRequest();
+ fireRequest(false);
+ fireRequest(true);
}
}
}
*
* @param request
* The request to copy
- * @since 3.0
*/
protected void newCoalescedEventRequest(ITmfEventRequest request) {
synchronized (fLock) {
request.getNbRequested(),
request.getExecType());
coalescedRequest.addRequest(request);
+ coalescedRequest.setProviderFilter(this);
if (TmfCoreTracer.isRequestTraced()) {
- TmfCoreTracer.traceRequest(request, "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
- TmfCoreTracer.traceRequest(coalescedRequest, "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$
+ TmfCoreTracer.traceRequest(request.getRequestId(), "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
+ TmfCoreTracer.traceRequest(coalescedRequest.getRequestId(), "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$
}
+ coalesceChildrenRequests(coalescedRequest);
fPendingCoalescedRequests.add(coalescedRequest);
}
}
*
* @param request
* The request to add to the list
- * @since 3.0
*/
protected void coalesceEventRequest(ITmfEventRequest request) {
synchronized (fLock) {
- for (TmfCoalescedEventRequest coalescedRequest : fPendingCoalescedRequests) {
+ for (TmfCoalescedEventRequest coalescedRequest : getPendingRequests()) {
if (coalescedRequest.isCompatible(request)) {
coalescedRequest.addRequest(request);
if (TmfCoreTracer.isRequestTraced()) {
- TmfCoreTracer.traceRequest(request, "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
- TmfCoreTracer.traceRequest(coalescedRequest, "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$
+ TmfCoreTracer.traceRequest(request.getRequestId(), "COALESCED with " + coalescedRequest.getRequestId()); //$NON-NLS-1$
+ TmfCoreTracer.traceRequest(coalescedRequest.getRequestId(), "now contains " + coalescedRequest.getSubRequestIds()); //$NON-NLS-1$
}
+ coalesceChildrenRequests(coalescedRequest);
return;
}
}
}
}
- /**
- * Gets the number of background requests in pending queue.
- *
- * @return the number of background requests in pending queue
+ /*
+ * Sends a request with the parent if compatible.
*/
- private int getNbPendingBackgroundRequests() {
- int nbBackgroundRequests = 0;
+ private boolean sendWithParent(final ITmfEventRequest request) {
+ ITmfEventProvider parent = getParent();
+ if (parent instanceof TmfEventProvider) {
+ return ((TmfEventProvider) parent).sendIfCompatible(request);
+ }
+ return false;
+ }
+
+ /*
+ * Sends a request if compatible with a pending coalesced request.
+ */
+ private boolean sendIfCompatible(ITmfEventRequest request) {
synchronized (fLock) {
- for (ITmfEventRequest request : fPendingCoalescedRequests) {
- if (request.getExecType() == ExecutionType.BACKGROUND) {
- nbBackgroundRequests++;
+ for (TmfCoalescedEventRequest coalescedRequest : getPendingRequests()) {
+ if (coalescedRequest.isCompatible(request)) {
+ // Send so it can be coalesced with the parent(s)
+ sendRequest(request);
+ return true;
+ }
+ }
+ }
+ return sendWithParent(request);
+ }
+
+ /*
+ * Coalesces children requests with given request if compatible.
+ */
+ private void coalesceChildrenRequests(final TmfCoalescedEventRequest request) {
+ synchronized (fChildren) {
+ for (TmfEventProvider child : fChildren) {
+ child.coalesceCompatibleRequests(request);
+ }
+ }
+ }
+
+
+ /*
+ * Coalesces all pending requests that are compatible with coalesced request.
+ */
+ private void coalesceCompatibleRequests(TmfCoalescedEventRequest request) {
+ Iterator<TmfCoalescedEventRequest> iter = getPendingRequests().iterator();
+ while (iter.hasNext()) {
+ TmfCoalescedEventRequest pendingRequest = iter.next();
+ if (request.isCompatible(pendingRequest)) {
+ request.addRequest(pendingRequest);
+ if (TmfCoreTracer.isRequestTraced()) {
+ TmfCoreTracer.traceRequest(pendingRequest.getRequestId(), "COALESCED with " + request.getRequestId()); //$NON-NLS-1$
+ TmfCoreTracer.traceRequest(request.getRequestId(), "now contains " + request.getSubRequestIds()); //$NON-NLS-1$
}
+ iter.remove();
}
}
- return nbBackgroundRequests;
}
// ------------------------------------------------------------------------
*
* @param request
* The data request
- * @since 3.0
*/
protected void queueRequest(final ITmfEventRequest request) {
TmfEventThread thread = new TmfEventThread(this, request);
if (TmfCoreTracer.isRequestTraced()) {
- TmfCoreTracer.traceRequest(request, "QUEUED"); //$NON-NLS-1$
+ TmfCoreTracer.traceRequest(request.getRequestId(), "QUEUED"); //$NON-NLS-1$
}
fExecutor.execute(thread);
* The request
* @return An application specific context; null if request can't be
* serviced
- * @since 3.0
*/
public abstract ITmfContext armRequest(ITmfEventRequest request);
* @param nbRead
* The number of events read so far
* @return true if completion criteria is met
- * @since 3.0
*/
public boolean isCompleted(ITmfEventRequest request, ITmfEvent event, int nbRead) {
boolean requestCompleted = isCompleted2(request, nbRead);
if (!requestCompleted) {
ITmfTimestamp endTime = request.getRange().getEndTime();
- return event.getTimestamp().compareTo(endTime, false) > 0;
+ return event.getTimestamp().compareTo(endTime) > 0;
}
return requestCompleted;
}
/**
* @return the shutdown state (i.e. if it is accepting new requests)
- * @since 2.0
*/
protected boolean executorIsShutdown() {
return fExecutor.isShutdown();
/**
* @return the termination state
- * @since 2.0
*/
protected boolean executorIsTerminated() {
return fExecutor.isTerminated();
synchronized (fLock) {
fSignalDepth--;
if (fSignalDepth == 0) {
- fIsTimeout = false;
- fireRequest();
+ fireRequest(false);
}
}
}
+ @Override
+ public ITmfEventProvider getParent() {
+ synchronized (fLock) {
+ return fParent;
+ }
+ }
+
+ @Override
+ public void setParent(ITmfEventProvider parent) {
+ if (!(parent instanceof TmfEventProvider)) {
+ throw new IllegalArgumentException();
+ }
+
+ synchronized (fLock) {
+ fParent = (TmfEventProvider) parent;
+ }
+ }
+
+ @Override
+ public List<ITmfEventProvider> getChildren() {
+ synchronized (fChildren) {
+ List<ITmfEventProvider> list = new ArrayList<>();
+ list.addAll(fChildren);
+ return list;
+ }
+ }
+
+ @Override
+ public <T extends ITmfEventProvider> List<T> getChildren(Class<T> clazz) {
+ List<T> list = new ArrayList<>();
+ synchronized (fChildren) {
+ for (TmfEventProvider child : fChildren) {
+ if (clazz.isAssignableFrom(child.getClass())) {
+ list.add(clazz.cast(child));
+ }
+ }
+ }
+ return list;
+ }
+
+ @Override
+ public ITmfEventProvider getChild(String name) {
+ synchronized (fChildren) {
+ for (TmfEventProvider child : fChildren) {
+ if (child.getName().equals(name)) {
+ return child;
+ }
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public ITmfEventProvider getChild(int index) {
+ return NonNullUtils.checkNotNull(fChildren.get(index));
+ }
+
+ @Override
+ public void addChild(ITmfEventProvider child) {
+ if (!(child instanceof TmfEventProvider)) {
+ throw new IllegalArgumentException();
+ }
+ child.setParent(this);
+ fChildren.add((TmfEventProvider)child);
+ }
+
+ @Override
+ public int getNbChildren() {
+ return fChildren.size();
+ }
+
+ /**
+ * Returns true if an event was provided by this event provider or one of
+ * its children event providers else false.
+ *
+ * @param event
+ * the event to check
+ * @return <code>true</code> if event was provided by this provider or one
+ * of its children else <code>false</code>
+ */
+ @Override
+ public boolean matches(ITmfEvent event) {
+ if ((event.getTrace() == this)) {
+ return true;
+ }
+ if (fChildren.size() > 0) {
+ synchronized (fLock) {
+ List <TmfEventProvider> children = getChildren(TmfEventProvider.class);
+ for (TmfEventProvider child : children) {
+ if (child.matches(event)) {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ // ------------------------------------------------------------------------
+ // Debug code (will also used in tests using reflection)
+ // ------------------------------------------------------------------------
+
+ /**
+ * Gets a list of all pending requests. Debug code.
+ *
+ * @return a list of all pending requests
+ */
+ private List<TmfCoalescedEventRequest> getPendingRequests() {
+ return fPendingCoalescedRequests;
+ }
+
+ /**
+ * Clears all pending requests. Debug code.
+ */
+ private void clearPendingRequests() {
+ fPendingCoalescedRequests.clear();
+ }
+
+ /**
+ * Enables/disables the timer. Debug code.
+ *
+ * @param enabled
+ * the enable flag to set
+ */
+ private void setTimerEnabled(Boolean enabled) {
+ fIsTimerEnabled = enabled;
+ }
}