Revisit the TmfExperiment structure
[deliverable/tracecompass.git] / org.eclipse.linuxtools.lttng.core / src / org / eclipse / linuxtools / internal / lttng / core / control / LttngSyntheticEventProvider.java
1 /*******************************************************************************
2 * Copyright (c) 2010 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Alvaro Sanchez-Leon (alvsan09@gmail.com) - Initial API and implementation
11 * Marc Dumais (marc.dumais@ericsson.com) - Fix for 316455 (first part)
12 *******************************************************************************/
13
14 package org.eclipse.linuxtools.internal.lttng.core.control;
15
16 import java.util.HashMap;
17 import java.util.Iterator;
18 import java.util.List;
19 import java.util.Map;
20 import java.util.Vector;
21
22 import org.eclipse.linuxtools.internal.lttng.core.TraceDebug;
23 import org.eclipse.linuxtools.internal.lttng.core.event.LttngEvent;
24 import org.eclipse.linuxtools.internal.lttng.core.event.LttngEventType;
25 import org.eclipse.linuxtools.internal.lttng.core.event.LttngSyntheticEvent;
26 import org.eclipse.linuxtools.internal.lttng.core.event.LttngSyntheticEvent.SequenceInd;
27 import org.eclipse.linuxtools.internal.lttng.core.event.LttngTimestamp;
28 import org.eclipse.linuxtools.internal.lttng.core.model.LTTngTreeNode;
29 import org.eclipse.linuxtools.internal.lttng.core.request.LttngBaseEventRequest;
30 import org.eclipse.linuxtools.internal.lttng.core.state.evProcessor.ITransEventProcessor;
31 import org.eclipse.linuxtools.internal.lttng.core.state.evProcessor.state.StateEventToHandlerFactory;
32 import org.eclipse.linuxtools.internal.lttng.core.state.model.LttngTraceState;
33 import org.eclipse.linuxtools.internal.lttng.core.state.trace.IStateTraceManager;
34 import org.eclipse.linuxtools.tmf.core.component.TmfEventProvider;
35 import org.eclipse.linuxtools.tmf.core.event.TmfTimeRange;
36 import org.eclipse.linuxtools.tmf.core.event.TmfTimestamp;
37 import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest;
38 import org.eclipse.linuxtools.tmf.core.request.ITmfEventRequest;
39 import org.eclipse.linuxtools.tmf.core.request.TmfDataRequest;
40 import org.eclipse.linuxtools.tmf.core.signal.TmfEndSynchSignal;
41 import org.eclipse.linuxtools.tmf.core.signal.TmfStartSynchSignal;
42 import org.eclipse.linuxtools.tmf.core.trace.ITmfContext;
43 import org.eclipse.linuxtools.tmf.core.trace.ITmfTrace;
44 import org.eclipse.linuxtools.tmf.core.trace.TmfCheckpoint;
45 import org.eclipse.linuxtools.tmf.core.trace.TmfContext;
46 import org.eclipse.linuxtools.tmf.core.trace.TmfExperiment;
47
48 /**
49 * @author alvaro
50 *
51 */
52 public class LttngSyntheticEventProvider extends TmfEventProvider<LttngSyntheticEvent> {
53
54 // ========================================================================
55 // Data
56 // ========================================================================
57 public static final int BLOCK_SIZE = 50000;
58 public static final int NB_EVENTS = 1;
59 public static final int QUEUE_SIZE = 1; // lttng specific, one event at a time
60
61 private ITmfDataRequest<LttngSyntheticEvent> fmainRequest = null;
62 private LttngBaseEventRequest fSubRequest = null;
63
64 private final List<IStateTraceManager> fEventProviderRequests = new Vector<IStateTraceManager>();
65
66 private final LttngSyntheticEvent fStatusEvent;
67 volatile boolean startIndSent = false;
68 private LTTngTreeNode fExperiment = null;
69 private ITransEventProcessor fstateUpdateProcessor = StateEventToHandlerFactory.getInstance();
70 private boolean waitForRequest = false;
71 long dispatchTime = 0L;
72 long dispatchIndex = 0L;
73 long eventIndex;
74 private final Map<ITmfTrace<?>, LttngTraceState> traceToTraceStateModel = new HashMap<ITmfTrace<?>, LttngTraceState>();
75
76 private boolean fIsExperimentNotified = false;
77
78 // ========================================================================
79 // Constructor
80 // ========================================================================
81 /**
82 * Accessibility to package - use factory instead of this constructor
83 *
84 * @param type
85 */
86 LttngSyntheticEventProvider(Class<LttngSyntheticEvent> type) {
87 super("LttngSyntheticEventProvider", type, QUEUE_SIZE); //$NON-NLS-1$
88
89 // prepare empty instance status indicators and allow them to travel via
90 // the framework
91 String source = this.toString();
92 LttngEventType dtype = new LttngEventType();
93 LttngTimestamp statusTimeStamp = new LttngTimestamp(
94 TmfTimestamp.ZERO);
95
96 fStatusEvent = new LttngSyntheticEvent(null, statusTimeStamp, source,
97 dtype, null, null, null);
98 fStatusEvent.setSequenceInd(SequenceInd.STARTREQ);
99 }
100
101 // ========================================================================
102 // Methods
103 // ========================================================================
104
105 @SuppressWarnings("unchecked")
106 @Override
107 public synchronized ITmfContext armRequest(final ITmfDataRequest<LttngSyntheticEvent> request) {
108 // validate
109 // make sure we have the right type of request
110 if (!(request instanceof ITmfEventRequest<?>)) {
111 request.cancel();
112 TraceDebug.debug("Request is not an instance of ITmfEventRequest"); //$NON-NLS-1$
113 return null;
114 }
115
116 if (fExperiment == null) {
117 TraceDebug.debug("Experiment is null"); //$NON-NLS-1$
118 request.cancel();
119 return null;
120 }
121
122 // get ready to start processing
123 reset(fExperiment);
124
125 // At least one base provider shall be available
126 if (fEventProviderRequests.size() < 1) {
127 request.cancel();
128 TraceDebug.debug("No Base event providers available"); //$NON-NLS-1$
129 return null;
130 }
131
132 fmainRequest = request;
133
134 // define event data handling
135 ITmfEventRequest<LttngSyntheticEvent> eventRequest = (ITmfEventRequest<LttngSyntheticEvent>) fmainRequest;
136 TmfTimeRange reqWindow = eventRequest.getRange();
137
138 TraceDebug.debug("Main Synthethic event request started on thread: " + Thread.currentThread().getName()); //$NON-NLS-1$
139
140 TmfExperiment<LttngEvent> experiment = (TmfExperiment<LttngEvent>) fExperiment.getValue();
141 experiment.startSynch(new TmfStartSynchSignal(0));
142
143 TmfTimeRange adjustedRange = reqWindow;
144 long adjustedIndex = eventRequest.getIndex();
145 int nbRequested = eventRequest.getNbRequested();
146
147 // Figure-out if we need to increase the range of the request: if some
148 // checkpoints are before the beginning of the range, increase the
149 // range to catch them. We will then exercise the state system of
150 // those traces until the requested beginning time range, discarding
151 // the unrequested data.
152 IStateTraceManager traceManager;
153 Iterator<IStateTraceManager> iter = fEventProviderRequests.iterator();
154 // For each traceManager in the current experiment...
155 while(iter.hasNext()) {
156 traceManager = iter.next();
157 // restore trace state system to nearest check point
158 TmfCheckpoint checkPoint = null;
159 if (eventRequest.getIndex() > 0) {
160 checkPoint = traceManager.restoreCheckPointByIndex(eventRequest.getIndex());
161 } else {
162 checkPoint = traceManager.restoreCheckPointByTimestamp(reqWindow.getStartTime());
163 }
164
165 // validate that the checkpoint restored is within requested bounds
166 // (not outside the current trace's range or after the end of requested range)
167 TmfTimeRange traceRange = traceManager.getStateTrace().getTimeRange();
168 if ((checkPoint == null) ||
169 checkPoint.getTimestamp().getValue() < traceRange.getStartTime().getValue() ||
170 checkPoint.getTimestamp().getValue() > traceRange.getEndTime().getValue() ||
171 checkPoint.getTimestamp().getValue() >= reqWindow.getEndTime().getValue()
172 ) {
173 // checkpoint is out of trace bounds; no need to adjust request for this
174 // trace
175 }
176 else {
177 // use checkpoint time as new startTime for request if it's earlier than
178 // current startTime
179 if (adjustedRange.getStartTime().getValue() > checkPoint.getTimestamp().getValue() || adjustedIndex > (Long) checkPoint.getLocation().getLocation()) {
180 adjustedRange = new TmfTimeRange(checkPoint.getTimestamp(), reqWindow.getEndTime());
181 adjustedIndex = (Long) checkPoint.getLocation().getLocation();
182 if (nbRequested < TmfDataRequest.ALL_DATA) {
183 nbRequested += (eventRequest.getIndex() - adjustedIndex);
184 }
185 }
186 }
187 // Save which trace state model corresponds to current trace
188 traceToTraceStateModel.put(traceManager.getStateTrace(), traceManager.getStateModel());
189 }
190
191 dispatchTime = reqWindow.getStartTime().getValue();
192 dispatchIndex = eventRequest.getIndex();
193 eventIndex = adjustedIndex;
194
195 // Create a single request for all traces in the experiment, with coalesced time range.
196 fSubRequest = new LttngBaseEventRequest(adjustedRange, reqWindow.getStartTime(),
197 adjustedIndex, nbRequested, BLOCK_SIZE, eventRequest.getExecType() /*ITmfDataRequest.ExecutionType.FOREGROUND*/) {
198
199 private LttngSyntheticEvent syntheticEvent = null;
200
201 /*
202 * (non-Javadoc)
203 *
204 * @see org.eclipse.linuxtools.lttng.control.LttngEventRequest#handleData()
205 */
206 @Override
207 public void handleData(LttngEvent event) {
208 super.handleData(event);
209 if (event != null) {
210 synchronized (LttngSyntheticEventProvider.this) {
211 // Check if request was canceled
212 if ((fmainRequest == null) || (fmainRequest.isCompleted()) ) {
213 TraceDebug.debug("fmainRequest was canceled. Ignoring event " + event); //$NON-NLS-1$
214 return;
215 }
216
217 handleIncomingData(event);
218 }
219 } else {
220 TraceDebug.debug("handle data received with no data"); //$NON-NLS-1$
221 }
222 }
223
224 /*
225 * (non-Javadoc)
226 *
227 * @see org.eclipse.linuxtools.tmf.request.TmfDataRequest#handleCompleted()
228 */
229 @Override
230 public void handleCompleted() {
231 // mark this sub-request as completed
232 handleProviderDone(!isCancelled() && !isFailed());
233 super.handleCompleted();
234 }
235
236 /**
237 * Trigger the Analysis and sequential control of the events.
238 *
239 * @param e
240 */
241 private void handleIncomingData(LttngEvent e) {
242 long eventTime = e.getTimestamp().getValue();
243
244 ITmfTrace<?> inTrace = e.getTrace();
245 LttngTraceState traceModel = traceToTraceStateModel.get(inTrace);
246
247 // queue the new event data
248 updateSynEvent(e);
249
250 // If time at or above requested time, update application
251 if (eventTime >= dispatchTime && eventIndex >= dispatchIndex) {
252 // Before update
253 syntheticEvent.setSequenceInd(SequenceInd.BEFORE);
254 fmainRequest.handleData(syntheticEvent);
255
256 // Update state locally
257 syntheticEvent.setSequenceInd(SequenceInd.UPDATE);
258 fstateUpdateProcessor.process(syntheticEvent, traceModel);
259
260 // After Update
261 syntheticEvent.setSequenceInd(SequenceInd.AFTER);
262 fmainRequest.handleData(syntheticEvent);
263
264 } else {
265 // event time is between checkpoint adjusted time and
266 // requested time i.e. application does not expect the
267 // event, however the state system needs to be re-built
268 // to the dispatch point
269 syntheticEvent.setSequenceInd(SequenceInd.UPDATE);
270 fstateUpdateProcessor.process(syntheticEvent, traceModel);
271 }
272 eventIndex++;
273 }
274
275 /**
276 * Create a synthetic event from the received new reference, if
277 * the reference is the same there is no need for a new instance
278 *
279 * if this is the first event for this request, call start
280 * handler
281 *
282 * @param e
283 * @return
284 */
285 private LttngSyntheticEvent updateSynEvent(LttngEvent e) {
286 if ((syntheticEvent == null) || (syntheticEvent.getBaseEvent() != e)) {
287 syntheticEvent = new LttngSyntheticEvent(e);
288 }
289
290 ITmfTrace<?> inTrace = e.getTrace();
291 LttngTraceState traceModel = traceToTraceStateModel.get(inTrace);
292
293 // Trace model needed by application handlers
294 syntheticEvent.setTraceModel(traceModel);
295
296 // send the start request indication once per request thread
297 if (!startIndSent) {
298 TraceDebug.debug("Thread started: " + Thread.currentThread().getName()); //$NON-NLS-1$
299 handleProviderStarted(traceModel);
300 startIndSent = true;
301 }
302
303 return syntheticEvent;
304 }
305 };
306
307 // start request
308 TmfExperiment<LttngEvent> provider = (TmfExperiment<LttngEvent>) fExperiment.getValue();
309 provider.sendRequest(fSubRequest);
310
311 // notify LTTngEvent provider that all requests were sent
312 synchronized (this) {
313 TmfExperiment.getCurrentExperiment().notifyPendingRequest(false);
314 fIsExperimentNotified = false;
315 }
316
317 experiment.endSynch(new TmfEndSynchSignal(0));
318
319 // Return a dummy context, not used for relay provider
320 return new TmfContext();
321 }
322
323 /**
324 * Notify listeners to prepare to receive data e.g. clean previous data etc.
325 */
326 public synchronized void handleProviderStarted(LttngTraceState traceModel) {
327 LttngSyntheticEvent startIndEvent = new LttngSyntheticEvent(fStatusEvent);
328 startIndEvent.setSequenceInd(SequenceInd.STARTREQ);
329
330 // Notify application
331 fmainRequest.handleData(startIndEvent);
332
333 // Notify state event processor
334 fstateUpdateProcessor.process(startIndEvent, null);
335 }
336
337 /**
338 * Notify listeners, no more events for the current request will be
339 * distributed e.g. update view.
340 */
341 public synchronized void handleProviderDone(boolean isSuccess) {
342 // Notify application. One notification per trace so the last state of each trace can be
343 // drawn
344 for (LttngTraceState traceModel : traceToTraceStateModel.values()) {
345 // Take the trace model from traceToTraceStateModel list since it has a copy
346 // of the state
347 LttngSyntheticEvent finishEvent = new LttngSyntheticEvent(fStatusEvent);
348 finishEvent.setSequenceInd(SequenceInd.ENDREQ);
349 finishEvent.setTraceModel(traceModel);
350
351 fmainRequest.handleData(finishEvent);
352 }
353
354 if(isSuccess) {
355 // Finish main request
356 fmainRequest.done();
357 }
358 else {
359 // Cancel main request
360 fmainRequest.cancel();
361
362 }
363 }
364
365 /**
366 * Reset provider to a state ready to begin thread execution
367 *
368 * @param experimentNode
369 */
370 public synchronized void reset(LTTngTreeNode experimentNode) {
371
372 conditionallyCancelRequests();
373
374 fEventProviderRequests.clear();
375 startIndSent = false;
376
377 // set of base event providers
378 if (fExperiment != null) {
379 LTTngTreeNode[] traces = fExperiment.getChildren();
380 for (LTTngTreeNode trace : traces) {
381 IStateTraceManager traceBaseEventProvider = (IStateTraceManager) trace;
382 fEventProviderRequests.add(traceBaseEventProvider);
383 }
384 }
385
386 if (fExperiment != experimentNode) {
387 updateExperimentNode(experimentNode);
388 }
389 }
390
391 /**
392 * Point to a new experiment reference
393 *
394 * @param experiment
395 */
396 private synchronized void updateExperimentNode(LTTngTreeNode experiment) {
397 if (experiment == null)
398 return;
399 if (experiment.getValue() instanceof TmfExperiment<?>) {
400 fExperiment = experiment;
401 } else {
402 TraceDebug.debug("Experiment received is not instance of TmfExperiment: " //$NON-NLS-1$
403 + experiment.getClass().getName());
404 }
405 }
406
407 /*
408 * (non-Javadoc)
409 *
410 * @see
411 * org.eclipse.linuxtools.tmf.component.TmfDataProvider#sendRequest(org.
412 * eclipse.linuxtools.tmf.request.TmfDataRequest)
413 */
414 @Override
415 public void sendRequest(final ITmfDataRequest<LttngSyntheticEvent> request) {
416 synchronized (this) {
417 if (!fIsExperimentNotified) {
418 @SuppressWarnings("unchecked")
419 TmfExperiment<LttngSyntheticEvent> experiment = (TmfExperiment<LttngSyntheticEvent>) TmfExperiment.getCurrentExperiment();
420 if (experiment != null) {
421 experiment.notifyPendingRequest(true);
422 fIsExperimentNotified = true;
423 }
424 }
425 }
426
427 super.sendRequest(request);
428 if (waitForRequest) {
429 try {
430 request.waitForCompletion();
431 } catch (InterruptedException e) {
432 e.printStackTrace();
433 }
434 }
435 }
436
437 /**
438 * @return the waitForRequest
439 */
440 public boolean isWaitForRequest() {
441 return waitForRequest;
442 }
443
444 /**
445 * @param waitForRequest
446 * configures the provider to wait for the request completion
447 */
448 public void setWaitForRequest(boolean waitForRequest) {
449 this.waitForRequest = waitForRequest;
450 }
451
452 @Override
453 public LttngSyntheticEvent getNext(ITmfContext context) {
454 try {
455 fmainRequest.waitForCompletion();
456 } catch (InterruptedException e) {
457 e.printStackTrace();
458 }
459 return null;
460 }
461
462 /**
463 * Cancels the ongoing requests for this data provider if necessary
464 */
465 public synchronized void conditionallyCancelRequests() {
466 if ((fSubRequest != null) && (!fSubRequest.isCompleted())) {
467
468 TraceDebug.debug("Canceling synthethic event request!"); //$NON-NLS-1$
469
470 // This will also cancel the fmainRequest
471 fSubRequest.cancel();
472 // Reset the request references
473 fSubRequest = null;
474 fmainRequest = null;
475 }
476 }
477
478 @Override
479 protected void queueBackgroundRequest(ITmfDataRequest<LttngSyntheticEvent> request, int blockSize, boolean indexing) {
480 // do not split background synthetic requests
481 queueRequest(request);
482 }
483 }
This page took 0.070189 seconds and 6 git commands to generate.