1 /*******************************************************************************
2 * Copyright (c) 2010 Ericsson
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * Alvaro Sanchez-Leon (alvsan09@gmail.com) - Initial API and implementation
11 * Marc Dumais (marc.dumais@ericsson.com) - Fix for 316455 (first part)
12 *******************************************************************************/
14 package org
.eclipse
.linuxtools
.internal
.lttng
.core
.control
;
16 import java
.util
.HashMap
;
17 import java
.util
.Iterator
;
18 import java
.util
.List
;
20 import java
.util
.Vector
;
22 import org
.eclipse
.linuxtools
.internal
.lttng
.core
.TraceDebug
;
23 import org
.eclipse
.linuxtools
.internal
.lttng
.core
.event
.LttngEvent
;
24 import org
.eclipse
.linuxtools
.internal
.lttng
.core
.event
.LttngEventType
;
25 import org
.eclipse
.linuxtools
.internal
.lttng
.core
.event
.LttngSyntheticEvent
;
26 import org
.eclipse
.linuxtools
.internal
.lttng
.core
.event
.LttngSyntheticEvent
.SequenceInd
;
27 import org
.eclipse
.linuxtools
.internal
.lttng
.core
.event
.LttngTimestamp
;
28 import org
.eclipse
.linuxtools
.internal
.lttng
.core
.model
.LTTngTreeNode
;
29 import org
.eclipse
.linuxtools
.internal
.lttng
.core
.request
.LttngBaseEventRequest
;
30 import org
.eclipse
.linuxtools
.internal
.lttng
.core
.state
.evProcessor
.ITransEventProcessor
;
31 import org
.eclipse
.linuxtools
.internal
.lttng
.core
.state
.evProcessor
.state
.StateEventToHandlerFactory
;
32 import org
.eclipse
.linuxtools
.internal
.lttng
.core
.state
.model
.LttngTraceState
;
33 import org
.eclipse
.linuxtools
.internal
.lttng
.core
.state
.trace
.IStateTraceManager
;
34 import org
.eclipse
.linuxtools
.tmf
.core
.component
.TmfEventProvider
;
35 import org
.eclipse
.linuxtools
.tmf
.core
.event
.TmfTimeRange
;
36 import org
.eclipse
.linuxtools
.tmf
.core
.event
.TmfTimestamp
;
37 import org
.eclipse
.linuxtools
.tmf
.core
.request
.ITmfDataRequest
;
38 import org
.eclipse
.linuxtools
.tmf
.core
.request
.ITmfEventRequest
;
39 import org
.eclipse
.linuxtools
.tmf
.core
.request
.TmfDataRequest
;
40 import org
.eclipse
.linuxtools
.tmf
.core
.signal
.TmfEndSynchSignal
;
41 import org
.eclipse
.linuxtools
.tmf
.core
.signal
.TmfStartSynchSignal
;
42 import org
.eclipse
.linuxtools
.tmf
.core
.trace
.ITmfContext
;
43 import org
.eclipse
.linuxtools
.tmf
.core
.trace
.ITmfTrace
;
44 import org
.eclipse
.linuxtools
.tmf
.core
.trace
.TmfCheckpoint
;
45 import org
.eclipse
.linuxtools
.tmf
.core
.trace
.TmfContext
;
46 import org
.eclipse
.linuxtools
.tmf
.core
.trace
.TmfExperiment
;
52 public class LttngSyntheticEventProvider
extends TmfEventProvider
<LttngSyntheticEvent
> {
54 // ========================================================================
56 // ========================================================================
57 public static final int BLOCK_SIZE
= 50000;
58 public static final int NB_EVENTS
= 1;
59 public static final int QUEUE_SIZE
= 1; // lttng specific, one event at a time
61 private ITmfDataRequest
<LttngSyntheticEvent
> fmainRequest
= null;
62 private LttngBaseEventRequest fSubRequest
= null;
64 private final List
<IStateTraceManager
> fEventProviderRequests
= new Vector
<IStateTraceManager
>();
66 private final LttngSyntheticEvent fStatusEvent
;
67 volatile boolean startIndSent
= false;
68 private LTTngTreeNode fExperiment
= null;
69 private ITransEventProcessor fstateUpdateProcessor
= StateEventToHandlerFactory
.getInstance();
70 private boolean waitForRequest
= false;
71 long dispatchTime
= 0L;
72 long dispatchIndex
= 0L;
74 private final Map
<ITmfTrace
<?
>, LttngTraceState
> traceToTraceStateModel
= new HashMap
<ITmfTrace
<?
>, LttngTraceState
>();
76 private boolean fIsExperimentNotified
= false;
78 // ========================================================================
80 // ========================================================================
82 * Accessibility to package - use factory instead of this constructor
86 LttngSyntheticEventProvider(Class
<LttngSyntheticEvent
> type
) {
87 super("LttngSyntheticEventProvider", type
, QUEUE_SIZE
); //$NON-NLS-1$
89 // prepare empty instance status indicators and allow them to travel via
91 String source
= this.toString();
92 LttngEventType dtype
= new LttngEventType();
93 LttngTimestamp statusTimeStamp
= new LttngTimestamp(
96 fStatusEvent
= new LttngSyntheticEvent(null, statusTimeStamp
, source
,
97 dtype
, null, null, null);
98 fStatusEvent
.setSequenceInd(SequenceInd
.STARTREQ
);
101 // ========================================================================
103 // ========================================================================
105 @SuppressWarnings("unchecked")
107 public synchronized ITmfContext
armRequest(final ITmfDataRequest
<LttngSyntheticEvent
> request
) {
109 // make sure we have the right type of request
110 if (!(request
instanceof ITmfEventRequest
<?
>)) {
112 TraceDebug
.debug("Request is not an instance of ITmfEventRequest"); //$NON-NLS-1$
116 if (fExperiment
== null) {
117 TraceDebug
.debug("Experiment is null"); //$NON-NLS-1$
122 // get ready to start processing
125 // At least one base provider shall be available
126 if (fEventProviderRequests
.size() < 1) {
128 TraceDebug
.debug("No Base event providers available"); //$NON-NLS-1$
132 fmainRequest
= request
;
134 // define event data handling
135 ITmfEventRequest
<LttngSyntheticEvent
> eventRequest
= (ITmfEventRequest
<LttngSyntheticEvent
>) fmainRequest
;
136 TmfTimeRange reqWindow
= eventRequest
.getRange();
138 TraceDebug
.debug("Main Synthethic event request started on thread: " + Thread
.currentThread().getName()); //$NON-NLS-1$
140 TmfExperiment
<LttngEvent
> experiment
= (TmfExperiment
<LttngEvent
>) fExperiment
.getValue();
141 experiment
.startSynch(new TmfStartSynchSignal(0));
143 TmfTimeRange adjustedRange
= reqWindow
;
144 long adjustedIndex
= eventRequest
.getIndex();
145 int nbRequested
= eventRequest
.getNbRequested();
147 // Figure-out if we need to increase the range of the request: if some
148 // checkpoints are before the beginning of the range, increase the
149 // range to catch them. We will then exercise the state system of
150 // those traces until the requested beginning time range, discarding
151 // the unrequested data.
152 IStateTraceManager traceManager
;
153 Iterator
<IStateTraceManager
> iter
= fEventProviderRequests
.iterator();
154 // For each traceManager in the current experiment...
155 while(iter
.hasNext()) {
156 traceManager
= iter
.next();
157 // restore trace state system to nearest check point
158 TmfCheckpoint checkPoint
= null;
159 if (eventRequest
.getIndex() > 0) {
160 checkPoint
= traceManager
.restoreCheckPointByIndex(eventRequest
.getIndex());
162 checkPoint
= traceManager
.restoreCheckPointByTimestamp(reqWindow
.getStartTime());
165 // validate that the checkpoint restored is within requested bounds
166 // (not outside the current trace's range or after the end of requested range)
167 TmfTimeRange traceRange
= traceManager
.getStateTrace().getTimeRange();
168 if ((checkPoint
== null) ||
169 checkPoint
.getTimestamp().getValue() < traceRange
.getStartTime().getValue() ||
170 checkPoint
.getTimestamp().getValue() > traceRange
.getEndTime().getValue() ||
171 checkPoint
.getTimestamp().getValue() >= reqWindow
.getEndTime().getValue()
173 // checkpoint is out of trace bounds; no need to adjust request for this
177 // use checkpoint time as new startTime for request if it's earlier than
179 if (adjustedRange
.getStartTime().getValue() > checkPoint
.getTimestamp().getValue() || adjustedIndex
> (Long
) checkPoint
.getLocation().getLocation()) {
180 adjustedRange
= new TmfTimeRange(checkPoint
.getTimestamp(), reqWindow
.getEndTime());
181 adjustedIndex
= (Long
) checkPoint
.getLocation().getLocation();
182 if (nbRequested
< TmfDataRequest
.ALL_DATA
) {
183 nbRequested
+= (eventRequest
.getIndex() - adjustedIndex
);
187 // Save which trace state model corresponds to current trace
188 traceToTraceStateModel
.put(traceManager
.getStateTrace(), traceManager
.getStateModel());
191 dispatchTime
= reqWindow
.getStartTime().getValue();
192 dispatchIndex
= eventRequest
.getIndex();
193 eventIndex
= adjustedIndex
;
195 // Create a single request for all traces in the experiment, with coalesced time range.
196 fSubRequest
= new LttngBaseEventRequest(adjustedRange
, reqWindow
.getStartTime(),
197 adjustedIndex
, nbRequested
, BLOCK_SIZE
, eventRequest
.getExecType() /*ITmfDataRequest.ExecutionType.FOREGROUND*/) {
199 private LttngSyntheticEvent syntheticEvent
= null;
204 * @see org.eclipse.linuxtools.lttng.control.LttngEventRequest#handleData()
207 public void handleData(LttngEvent event
) {
208 super.handleData(event
);
210 synchronized (LttngSyntheticEventProvider
.this) {
211 // Check if request was canceled
212 if ((fmainRequest
== null) || (fmainRequest
.isCompleted()) ) {
213 TraceDebug
.debug("fmainRequest was canceled. Ignoring event " + event
); //$NON-NLS-1$
217 handleIncomingData(event
);
220 TraceDebug
.debug("handle data received with no data"); //$NON-NLS-1$
227 * @see org.eclipse.linuxtools.tmf.request.TmfDataRequest#handleCompleted()
230 public void handleCompleted() {
231 // mark this sub-request as completed
232 handleProviderDone(!isCancelled() && !isFailed());
233 super.handleCompleted();
237 * Trigger the Analysis and sequential control of the events.
241 private void handleIncomingData(LttngEvent e
) {
242 long eventTime
= e
.getTimestamp().getValue();
244 ITmfTrace
<?
> inTrace
= e
.getTrace();
245 LttngTraceState traceModel
= traceToTraceStateModel
.get(inTrace
);
247 // queue the new event data
250 // If time at or above requested time, update application
251 if (eventTime
>= dispatchTime
&& eventIndex
>= dispatchIndex
) {
253 syntheticEvent
.setSequenceInd(SequenceInd
.BEFORE
);
254 fmainRequest
.handleData(syntheticEvent
);
256 // Update state locally
257 syntheticEvent
.setSequenceInd(SequenceInd
.UPDATE
);
258 fstateUpdateProcessor
.process(syntheticEvent
, traceModel
);
261 syntheticEvent
.setSequenceInd(SequenceInd
.AFTER
);
262 fmainRequest
.handleData(syntheticEvent
);
265 // event time is between checkpoint adjusted time and
266 // requested time i.e. application does not expect the
267 // event, however the state system needs to be re-built
268 // to the dispatch point
269 syntheticEvent
.setSequenceInd(SequenceInd
.UPDATE
);
270 fstateUpdateProcessor
.process(syntheticEvent
, traceModel
);
276 * Create a synthetic event from the received new reference, if
277 * the reference is the same there is no need for a new instance
279 * if this is the first event for this request, call start
285 private LttngSyntheticEvent
updateSynEvent(LttngEvent e
) {
286 if ((syntheticEvent
== null) || (syntheticEvent
.getBaseEvent() != e
)) {
287 syntheticEvent
= new LttngSyntheticEvent(e
);
290 ITmfTrace
<?
> inTrace
= e
.getTrace();
291 LttngTraceState traceModel
= traceToTraceStateModel
.get(inTrace
);
293 // Trace model needed by application handlers
294 syntheticEvent
.setTraceModel(traceModel
);
296 // send the start request indication once per request thread
298 TraceDebug
.debug("Thread started: " + Thread
.currentThread().getName()); //$NON-NLS-1$
299 handleProviderStarted(traceModel
);
303 return syntheticEvent
;
308 TmfExperiment
<LttngEvent
> provider
= (TmfExperiment
<LttngEvent
>) fExperiment
.getValue();
309 provider
.sendRequest(fSubRequest
);
311 // notify LTTngEvent provider that all requests were sent
312 synchronized (this) {
313 TmfExperiment
.getCurrentExperiment().notifyPendingRequest(false);
314 fIsExperimentNotified
= false;
317 experiment
.endSynch(new TmfEndSynchSignal(0));
319 // Return a dummy context, not used for relay provider
320 return new TmfContext();
324 * Notify listeners to prepare to receive data e.g. clean previous data etc.
326 public synchronized void handleProviderStarted(LttngTraceState traceModel
) {
327 LttngSyntheticEvent startIndEvent
= new LttngSyntheticEvent(fStatusEvent
);
328 startIndEvent
.setSequenceInd(SequenceInd
.STARTREQ
);
330 // Notify application
331 fmainRequest
.handleData(startIndEvent
);
333 // Notify state event processor
334 fstateUpdateProcessor
.process(startIndEvent
, null);
338 * Notify listeners, no more events for the current request will be
339 * distributed e.g. update view.
341 public synchronized void handleProviderDone(boolean isSuccess
) {
342 // Notify application. One notification per trace so the last state of each trace can be
344 for (LttngTraceState traceModel
: traceToTraceStateModel
.values()) {
345 // Take the trace model from traceToTraceStateModel list since it has a copy
347 LttngSyntheticEvent finishEvent
= new LttngSyntheticEvent(fStatusEvent
);
348 finishEvent
.setSequenceInd(SequenceInd
.ENDREQ
);
349 finishEvent
.setTraceModel(traceModel
);
351 fmainRequest
.handleData(finishEvent
);
355 // Finish main request
359 // Cancel main request
360 fmainRequest
.cancel();
366 * Reset provider to a state ready to begin thread execution
368 * @param experimentNode
370 public synchronized void reset(LTTngTreeNode experimentNode
) {
372 conditionallyCancelRequests();
374 fEventProviderRequests
.clear();
375 startIndSent
= false;
377 // set of base event providers
378 if (fExperiment
!= null) {
379 LTTngTreeNode
[] traces
= fExperiment
.getChildren();
380 for (LTTngTreeNode trace
: traces
) {
381 IStateTraceManager traceBaseEventProvider
= (IStateTraceManager
) trace
;
382 fEventProviderRequests
.add(traceBaseEventProvider
);
386 if (fExperiment
!= experimentNode
) {
387 updateExperimentNode(experimentNode
);
392 * Point to a new experiment reference
396 private synchronized void updateExperimentNode(LTTngTreeNode experiment
) {
397 if (experiment
== null)
399 if (experiment
.getValue() instanceof TmfExperiment
<?
>) {
400 fExperiment
= experiment
;
402 TraceDebug
.debug("Experiment received is not instance of TmfExperiment: " //$NON-NLS-1$
403 + experiment
.getClass().getName());
411 * org.eclipse.linuxtools.tmf.component.TmfDataProvider#sendRequest(org.
412 * eclipse.linuxtools.tmf.request.TmfDataRequest)
415 public void sendRequest(final ITmfDataRequest
<LttngSyntheticEvent
> request
) {
416 synchronized (this) {
417 if (!fIsExperimentNotified
) {
418 @SuppressWarnings("unchecked")
419 TmfExperiment
<LttngSyntheticEvent
> experiment
= (TmfExperiment
<LttngSyntheticEvent
>) TmfExperiment
.getCurrentExperiment();
420 if (experiment
!= null) {
421 experiment
.notifyPendingRequest(true);
422 fIsExperimentNotified
= true;
427 super.sendRequest(request
);
428 if (waitForRequest
) {
430 request
.waitForCompletion();
431 } catch (InterruptedException e
) {
438 * @return the waitForRequest
440 public boolean isWaitForRequest() {
441 return waitForRequest
;
445 * @param waitForRequest
446 * configures the provider to wait for the request completion
448 public void setWaitForRequest(boolean waitForRequest
) {
449 this.waitForRequest
= waitForRequest
;
453 public LttngSyntheticEvent
getNext(ITmfContext context
) {
455 fmainRequest
.waitForCompletion();
456 } catch (InterruptedException e
) {
463 * Cancels the ongoing requests for this data provider if necessary
465 public synchronized void conditionallyCancelRequests() {
466 if ((fSubRequest
!= null) && (!fSubRequest
.isCompleted())) {
468 TraceDebug
.debug("Canceling synthethic event request!"); //$NON-NLS-1$
470 // This will also cancel the fmainRequest
471 fSubRequest
.cancel();
472 // Reset the request references
479 protected void queueBackgroundRequest(ITmfDataRequest
<LttngSyntheticEvent
> request
, int blockSize
, boolean indexing
) {
480 // do not split background synthetic requests
481 queueRequest(request
);