1 /*******************************************************************************
2 * Copyright (c) 2010 Ericsson
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * Alvaro Sanchez-Leon (alvsan09@gmail.com) - Initial API and implementation
11 * Marc Dumais (marc.dumais@ericsson.com) - Fix for 316455 (first part)
12 *******************************************************************************/
14 package org
.eclipse
.linuxtools
.lttng
.control
;
16 import java
.util
.HashMap
;
17 import java
.util
.Iterator
;
18 import java
.util
.List
;
20 import java
.util
.Vector
;
22 import org
.eclipse
.linuxtools
.lttng
.TraceDebug
;
23 import org
.eclipse
.linuxtools
.lttng
.event
.LttngEvent
;
24 import org
.eclipse
.linuxtools
.lttng
.event
.LttngEventType
;
25 import org
.eclipse
.linuxtools
.lttng
.event
.LttngSyntheticEvent
;
26 import org
.eclipse
.linuxtools
.lttng
.event
.LttngSyntheticEvent
.SequenceInd
;
27 import org
.eclipse
.linuxtools
.lttng
.event
.LttngTimestamp
;
28 import org
.eclipse
.linuxtools
.lttng
.model
.LTTngTreeNode
;
29 import org
.eclipse
.linuxtools
.lttng
.request
.LttngBaseEventRequest
;
30 import org
.eclipse
.linuxtools
.lttng
.state
.evProcessor
.ITransEventProcessor
;
31 import org
.eclipse
.linuxtools
.lttng
.state
.evProcessor
.state
.StateEventToHandlerFactory
;
32 import org
.eclipse
.linuxtools
.lttng
.state
.model
.LttngTraceState
;
33 import org
.eclipse
.linuxtools
.lttng
.state
.trace
.IStateTraceManager
;
34 import org
.eclipse
.linuxtools
.tmf
.component
.TmfEventProvider
;
35 import org
.eclipse
.linuxtools
.tmf
.event
.TmfEventSource
;
36 import org
.eclipse
.linuxtools
.tmf
.event
.TmfTimeRange
;
37 import org
.eclipse
.linuxtools
.tmf
.event
.TmfTimestamp
;
38 import org
.eclipse
.linuxtools
.tmf
.experiment
.TmfExperiment
;
39 import org
.eclipse
.linuxtools
.tmf
.request
.ITmfDataRequest
;
40 import org
.eclipse
.linuxtools
.tmf
.request
.ITmfEventRequest
;
41 import org
.eclipse
.linuxtools
.tmf
.request
.TmfDataRequest
;
42 import org
.eclipse
.linuxtools
.tmf
.signal
.TmfEndSynchSignal
;
43 import org
.eclipse
.linuxtools
.tmf
.signal
.TmfStartSynchSignal
;
44 import org
.eclipse
.linuxtools
.tmf
.trace
.ITmfContext
;
45 import org
.eclipse
.linuxtools
.tmf
.trace
.ITmfTrace
;
46 import org
.eclipse
.linuxtools
.tmf
.trace
.TmfCheckpoint
;
47 import org
.eclipse
.linuxtools
.tmf
.trace
.TmfContext
;
53 public class LttngSyntheticEventProvider
extends TmfEventProvider
<LttngSyntheticEvent
> {
55 // ========================================================================
57 // ========================================================================
58 public static final int BLOCK_SIZE
= 50000;
59 public static final int NB_EVENTS
= 1;
60 public static final int QUEUE_SIZE
= 1; // lttng specific, one event at a time
62 private ITmfDataRequest
<LttngSyntheticEvent
> fmainRequest
= null;
63 private LttngBaseEventRequest fSubRequest
= null;
65 private final List
<IStateTraceManager
> fEventProviderRequests
= new Vector
<IStateTraceManager
>();
67 private final LttngSyntheticEvent fStatusEvent
;
68 volatile boolean startIndSent
= false;
69 private LTTngTreeNode fExperiment
= null;
70 private ITransEventProcessor fstateUpdateProcessor
= StateEventToHandlerFactory
.getInstance();
71 private boolean waitForRequest
= false;
72 long dispatchTime
= 0L;
73 long dispatchIndex
= 0L;
75 private final Map
<ITmfTrace
, LttngTraceState
> traceToTraceStateModel
= new HashMap
<ITmfTrace
, LttngTraceState
>();
77 private boolean fIsExperimentNotified
= false;
79 // ========================================================================
81 // ========================================================================
83 * Accessibility to package - use factory instead of this constructor
87 LttngSyntheticEventProvider(Class
<LttngSyntheticEvent
> type
) {
88 super("LttngSyntheticEventProvider", type
, QUEUE_SIZE
); //$NON-NLS-1$
90 // prepare empty instance status indicators and allow them to travel via
92 TmfEventSource source
= new TmfEventSource(this);
93 LttngEventType dtype
= new LttngEventType();
94 LttngTimestamp statusTimeStamp
= new LttngTimestamp(
97 fStatusEvent
= new LttngSyntheticEvent(null, statusTimeStamp
, source
,
98 dtype
, null, null, null);
99 fStatusEvent
.setSequenceInd(SequenceInd
.STARTREQ
);
102 // ========================================================================
104 // ========================================================================
106 @SuppressWarnings("unchecked")
108 public ITmfContext
armRequest(final ITmfDataRequest
<LttngSyntheticEvent
> request
) {
110 // make sure we have the right type of request
111 if (!(request
instanceof ITmfEventRequest
<?
>)) {
113 TraceDebug
.debug("Request is not an instance of ITmfEventRequest"); //$NON-NLS-1$
117 if (fExperiment
== null) {
118 TraceDebug
.debug("Experiment is null"); //$NON-NLS-1$
123 // get ready to start processing
126 // At least one base provider shall be available
127 if (fEventProviderRequests
.size() < 1) {
129 TraceDebug
.debug("No Base event providers available"); //$NON-NLS-1$
133 fmainRequest
= request
;
135 // define event data handling
136 ITmfEventRequest
<LttngSyntheticEvent
> eventRequest
= (ITmfEventRequest
<LttngSyntheticEvent
>) fmainRequest
;
137 TmfTimeRange reqWindow
= eventRequest
.getRange();
139 TraceDebug
.debug("Main Synthethic event request started on thread: " + Thread
.currentThread().getName()); //$NON-NLS-1$
141 TmfExperiment
<LttngEvent
> experiment
= (TmfExperiment
<LttngEvent
>) fExperiment
.getValue();
142 experiment
.startSynch(new TmfStartSynchSignal(0));
144 TmfTimeRange adjustedRange
= reqWindow
;
145 long adjustedIndex
= eventRequest
.getIndex();
146 int nbRequested
= eventRequest
.getNbRequested();
148 // Figure-out if we need to increase the range of the request: if some
149 // checkpoints are before the beginning of the range, increase the
150 // range to catch them. We will then exercise the state system of
151 // those traces until the requested beginning time range, discarding
152 // the unrequested data.
153 IStateTraceManager traceManager
;
154 Iterator
<IStateTraceManager
> iter
= fEventProviderRequests
.iterator();
155 // For each traceManager in the current experiment...
156 while(iter
.hasNext()) {
157 traceManager
= iter
.next();
158 // restore trace state system to nearest check point
159 TmfCheckpoint checkPoint
= null;
160 if (eventRequest
.getIndex() > 0) {
161 checkPoint
= traceManager
.restoreCheckPointByIndex(eventRequest
.getIndex());
163 checkPoint
= traceManager
.restoreCheckPointByTimestamp(reqWindow
.getStartTime());
166 // validate that the checkpoint restored is within requested bounds
167 // (not outside the current trace's range or after the end of requested range)
168 TmfTimeRange traceRange
= traceManager
.getTrace().getTimeRange();
169 if ((checkPoint
== null) ||
170 checkPoint
.getTimestamp().getValue() < traceRange
.getStartTime().getValue() ||
171 checkPoint
.getTimestamp().getValue() > traceRange
.getEndTime().getValue() ||
172 checkPoint
.getTimestamp().getValue() >= reqWindow
.getEndTime().getValue()
174 // checkpoint is out of trace bounds; no need to adjust request for this
178 // use checkpoint time as new startTime for request if it's earlier than
180 if (adjustedRange
.getStartTime().getValue() > checkPoint
.getTimestamp().getValue() || adjustedIndex
> (Long
) checkPoint
.getLocation().getLocation()) {
181 adjustedRange
= new TmfTimeRange(checkPoint
.getTimestamp(), reqWindow
.getEndTime());
182 adjustedIndex
= (Long
) checkPoint
.getLocation().getLocation();
183 if (nbRequested
< TmfDataRequest
.ALL_DATA
) {
184 nbRequested
+= (eventRequest
.getIndex() - adjustedIndex
);
188 // Save which trace state model corresponds to current trace
189 traceToTraceStateModel
.put(traceManager
.getTrace(), traceManager
.getStateModel());
192 dispatchTime
= reqWindow
.getStartTime().getValue();
193 dispatchIndex
= eventRequest
.getIndex();
194 eventIndex
= adjustedIndex
;
196 // Create a single request for all traces in the experiment, with coalesced time range.
197 fSubRequest
= new LttngBaseEventRequest(adjustedRange
, reqWindow
.getStartTime(),
198 adjustedIndex
, nbRequested
, BLOCK_SIZE
, eventRequest
.getExecType() /*ITmfDataRequest.ExecutionType.FOREGROUND*/) {
200 private LttngSyntheticEvent syntheticEvent
= null;
205 * @see org.eclipse.linuxtools.lttng.control.LttngEventRequest#handleData()
208 public void handleData(LttngEvent event
) {
209 super.handleData(event
);
211 synchronized (LttngSyntheticEventProvider
.this) {
212 // Check if request was canceled
213 if ((fmainRequest
== null) || (fmainRequest
.isCompleted()) ) {
214 TraceDebug
.debug("fmainRequest was canceled. Ignoring event " + event
); //$NON-NLS-1$
218 handleIncomingData(event
);
221 TraceDebug
.debug("handle data received with no data"); //$NON-NLS-1$
228 * @see org.eclipse.linuxtools.tmf.request.TmfDataRequest#handleCompleted()
231 public void handleCompleted() {
232 // mark this sub-request as completed
233 handleProviderDone(!isCancelled() && !isFailed());
234 super.handleCompleted();
238 * Trigger the Analysis and sequential control of the events.
242 private void handleIncomingData(LttngEvent e
) {
243 long eventTime
= e
.getTimestamp().getValue();
245 ITmfTrace inTrace
= e
.getParentTrace();
246 LttngTraceState traceModel
= traceToTraceStateModel
.get(inTrace
);
248 // queue the new event data
251 // If time at or above requested time, update application
252 if (eventTime
>= dispatchTime
&& eventIndex
>= dispatchIndex
) {
254 syntheticEvent
.setSequenceInd(SequenceInd
.BEFORE
);
255 fmainRequest
.handleData(syntheticEvent
);
257 // Update state locally
258 syntheticEvent
.setSequenceInd(SequenceInd
.UPDATE
);
259 fstateUpdateProcessor
.process(syntheticEvent
, traceModel
);
262 syntheticEvent
.setSequenceInd(SequenceInd
.AFTER
);
263 fmainRequest
.handleData(syntheticEvent
);
266 // event time is between checkpoint adjusted time and
267 // requested time i.e. application does not expect the
268 // event, however the state system needs to be re-built
269 // to the dispatch point
270 syntheticEvent
.setSequenceInd(SequenceInd
.UPDATE
);
271 fstateUpdateProcessor
.process(syntheticEvent
, traceModel
);
277 * Create a synthetic event from the received new reference, if
278 * the reference is the same there is no need for a new instance
280 * if this is the first event for this request, call start
286 private LttngSyntheticEvent
updateSynEvent(LttngEvent e
) {
287 if ((syntheticEvent
== null) || (syntheticEvent
.getBaseEvent() != e
)) {
288 syntheticEvent
= new LttngSyntheticEvent(e
);
291 ITmfTrace inTrace
= e
.getParentTrace();
292 LttngTraceState traceModel
= traceToTraceStateModel
.get(inTrace
);
294 // Trace model needed by application handlers
295 syntheticEvent
.setTraceModel(traceModel
);
297 // send the start request indication once per request thread
299 TraceDebug
.debug("Thread started: " + Thread
.currentThread().getName()); //$NON-NLS-1$
300 handleProviderStarted(traceModel
);
304 return syntheticEvent
;
309 TmfExperiment
<LttngEvent
> provider
= (TmfExperiment
<LttngEvent
>) fExperiment
.getValue();
310 provider
.sendRequest(fSubRequest
);
312 // notify LTTngEvent provider that all requests were sent
313 synchronized (this) {
314 TmfExperiment
.getCurrentExperiment().notifyPendingRequest(false);
315 fIsExperimentNotified
= false;
318 experiment
.endSynch(new TmfEndSynchSignal(0));
320 // Return a dummy context, not used for relay provider
321 return new TmfContext();
325 * Notify listeners to prepare to receive data e.g. clean previous data etc.
327 public synchronized void handleProviderStarted(LttngTraceState traceModel
) {
328 LttngSyntheticEvent startIndEvent
= new LttngSyntheticEvent(fStatusEvent
);
329 startIndEvent
.setSequenceInd(SequenceInd
.STARTREQ
);
331 // Notify application
332 fmainRequest
.handleData(startIndEvent
);
334 // Notify state event processor
335 fstateUpdateProcessor
.process(startIndEvent
, null);
339 * Notify listeners, no more events for the current request will be
340 * distributed e.g. update view.
342 public synchronized void handleProviderDone(boolean isSuccess
) {
343 // Notify application. One notification per trace so the last state of each trace can be
345 for (LttngTraceState traceModel
: traceToTraceStateModel
.values()) {
346 // Take the trace model from traceToTraceStateModel list since it has a copy
348 LttngSyntheticEvent finishEvent
= new LttngSyntheticEvent(fStatusEvent
);
349 finishEvent
.setSequenceInd(SequenceInd
.ENDREQ
);
350 finishEvent
.setTraceModel(traceModel
);
352 fmainRequest
.handleData(finishEvent
);
356 // Finish main request
360 // Cancel main request
361 fmainRequest
.cancel();
367 * Reset provider to a state ready to begin thread execution
369 * @param experimentNode
371 public synchronized void reset(LTTngTreeNode experimentNode
) {
373 conditionallyCancelRequests();
375 fEventProviderRequests
.clear();
376 startIndSent
= false;
378 // set of base event providers
379 if (fExperiment
!= null) {
380 LTTngTreeNode
[] traces
= fExperiment
.getChildren();
381 for (LTTngTreeNode trace
: traces
) {
382 IStateTraceManager traceBaseEventProvider
= (IStateTraceManager
) trace
;
383 fEventProviderRequests
.add(traceBaseEventProvider
);
387 if (fExperiment
!= experimentNode
) {
388 updateExperimentNode(experimentNode
);
393 * Point to a new experiment reference
397 private synchronized void updateExperimentNode(LTTngTreeNode experiment
) {
398 if (experiment
!= null
399 && experiment
.getValue() instanceof TmfExperiment
<?
>) {
400 fExperiment
= experiment
;
403 .debug("Experiment received is not instance of TmfExperiment: " //$NON-NLS-1$
404 + experiment
.getClass().getName());
412 * org.eclipse.linuxtools.tmf.component.TmfDataProvider#sendRequest(org.
413 * eclipse.linuxtools.tmf.request.TmfDataRequest)
416 public void sendRequest(final ITmfDataRequest
<LttngSyntheticEvent
> request
) {
417 synchronized (this) {
418 if (!fIsExperimentNotified
) {
419 @SuppressWarnings("unchecked")
420 TmfExperiment
<LttngSyntheticEvent
> experiment
= (TmfExperiment
<LttngSyntheticEvent
>) TmfExperiment
.getCurrentExperiment();
421 if (experiment
!= null) {
422 experiment
.notifyPendingRequest(true);
423 fIsExperimentNotified
= true;
428 super.sendRequest(request
);
429 if (waitForRequest
) {
431 request
.waitForCompletion();
432 } catch (InterruptedException e
) {
439 * @return the waitForRequest
441 public boolean isWaitForRequest() {
442 return waitForRequest
;
446 * @param waitForRequest
447 * configures the provider to wait for the request completion
449 public void setWaitForRequest(boolean waitForRequest
) {
450 this.waitForRequest
= waitForRequest
;
454 public LttngSyntheticEvent
getNext(ITmfContext context
) {
456 fmainRequest
.waitForCompletion();
457 } catch (InterruptedException e
) {
464 * Cancels the ongoing requests for this data provider if necessary
466 public synchronized void conditionallyCancelRequests() {
467 if ((fSubRequest
!= null) && (!fSubRequest
.isCompleted())) {
469 TraceDebug
.debug("Canceling synthethic event request!"); //$NON-NLS-1$
471 // This will also cancel the fmainRequest
472 fSubRequest
.cancel();
473 // Reset the request references
480 protected void queueBackgroundRequest(ITmfDataRequest
<LttngSyntheticEvent
> request
, int blockSize
, boolean indexing
) {
481 // do not split background synthetic requests
482 queueRequest(request
);