tmf: remove deprecated methods from tmf
[deliverable/tracecompass.git] / tmf / org.eclipse.tracecompass.tmf.core / src / org / eclipse / tracecompass / tmf / core / trace / experiment / TmfExperiment.java
1 /*******************************************************************************
2 * Copyright (c) 2009, 2015 Ericsson, École Polytechnique de Montréal
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 * Francois Chouinard - Updated as per TMF Trace Model 1.0
12 * Patrick Tasse - Updated for removal of context clone
13 * Patrick Tasse - Updated for ranks in experiment location
14 * Geneviève Bastien - Added support of experiment synchronization
15 * Added the initExperiment method and default constructor
16 * Bernd Hufmann - Updated for added interfaces to ITmfEventProvider
17 *******************************************************************************/
18
19 package org.eclipse.tracecompass.tmf.core.trace.experiment;
20
21 import static org.eclipse.tracecompass.common.core.NonNullUtils.checkNotNull;
22
23 import java.io.File;
24 import java.math.BigInteger;
25 import java.nio.ByteBuffer;
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.List;
29 import java.util.concurrent.locks.Lock;
30 import java.util.concurrent.locks.ReentrantLock;
31 import java.util.function.Function;
32 import java.util.stream.Collectors;
33
34 import org.eclipse.core.resources.IProject;
35 import org.eclipse.core.resources.IResource;
36 import org.eclipse.core.runtime.CoreException;
37 import org.eclipse.core.runtime.IStatus;
38 import org.eclipse.core.runtime.MultiStatus;
39 import org.eclipse.core.runtime.Status;
40 import org.eclipse.jdt.annotation.NonNull;
41 import org.eclipse.jdt.annotation.Nullable;
42 import org.eclipse.tracecompass.internal.tmf.core.Activator;
43 import org.eclipse.tracecompass.internal.tmf.core.synchronization.TmfTimestampTransform;
44 import org.eclipse.tracecompass.internal.tmf.core.trace.experiment.TmfExperimentContext;
45 import org.eclipse.tracecompass.internal.tmf.core.trace.experiment.TmfExperimentLocation;
46 import org.eclipse.tracecompass.internal.tmf.core.trace.experiment.TmfLocationArray;
47 import org.eclipse.tracecompass.tmf.core.TmfCommonConstants;
48 import org.eclipse.tracecompass.tmf.core.event.ITmfEvent;
49 import org.eclipse.tracecompass.tmf.core.exceptions.TmfTraceException;
50 import org.eclipse.tracecompass.tmf.core.project.model.ITmfPropertiesProvider;
51 import org.eclipse.tracecompass.tmf.core.request.ITmfEventRequest;
52 import org.eclipse.tracecompass.tmf.core.signal.TmfSignalHandler;
53 import org.eclipse.tracecompass.tmf.core.signal.TmfTraceOpenedSignal;
54 import org.eclipse.tracecompass.tmf.core.signal.TmfTraceRangeUpdatedSignal;
55 import org.eclipse.tracecompass.tmf.core.signal.TmfTraceSynchronizedSignal;
56 import org.eclipse.tracecompass.tmf.core.synchronization.ITmfTimestampTransform;
57 import org.eclipse.tracecompass.tmf.core.synchronization.SynchronizationAlgorithm;
58 import org.eclipse.tracecompass.tmf.core.synchronization.SynchronizationManager;
59 import org.eclipse.tracecompass.tmf.core.synchronization.TimestampTransformFactory;
60 import org.eclipse.tracecompass.tmf.core.timestamp.ITmfTimestamp;
61 import org.eclipse.tracecompass.tmf.core.timestamp.TmfTimeRange;
62 import org.eclipse.tracecompass.tmf.core.timestamp.TmfTimestamp;
63 import org.eclipse.tracecompass.tmf.core.trace.ITmfContext;
64 import org.eclipse.tracecompass.tmf.core.trace.ITmfTrace;
65 import org.eclipse.tracecompass.tmf.core.trace.TmfTrace;
66 import org.eclipse.tracecompass.tmf.core.trace.TmfTraceManager;
67 import org.eclipse.tracecompass.tmf.core.trace.indexer.ITmfPersistentlyIndexable;
68 import org.eclipse.tracecompass.tmf.core.trace.indexer.ITmfTraceIndexer;
69 import org.eclipse.tracecompass.tmf.core.trace.indexer.TmfBTreeTraceIndexer;
70 import org.eclipse.tracecompass.tmf.core.trace.location.ITmfLocation;
71
72 import com.google.common.collect.HashMultimap;
73 import com.google.common.collect.Multimap;
74
75 /**
76 * TmfExperiment presents a time-ordered, unified view of a set of ITmfTrace:s
77 * that are part of a tracing experiment.
78 *
79 * @version 1.0
80 * @author Francois Chouinard
81 */
82 public class TmfExperiment extends TmfTrace implements ITmfPersistentlyIndexable {
83
84 // ------------------------------------------------------------------------
85 // Constants
86 // ------------------------------------------------------------------------
87
88 /**
89 * The name of the directory containing trace synchronization data. This
90 * directory typically will be preserved when traces are synchronized.
91 * Analysis involved in synchronization can put their supplementary files in
92 * there so they are not deleted when synchronized traces are copied.
93 */
94 private static final String SYNCHRONIZATION_DIRECTORY = "sync_data"; //$NON-NLS-1$
95
96 /**
97 * The default index page size
98 */
99 public static final int DEFAULT_INDEX_PAGE_SIZE = 5000;
100
101 /**
102 * Property name for traces defining a clock offset.
103 */
104 private static final String CLOCK_OFFSET_PROPERTY = "clock_offset"; //$NON-NLS-1$
105
106 /**
107 * If the automatic clock offset is higher than this value, emit a warning.
108 */
109 private static final long CLOCK_OFFSET_THRESHOLD_NS = 500000;
110
111 // ------------------------------------------------------------------------
112 // Attributes
113 // ------------------------------------------------------------------------
114
115 /**
116 * The set of traces that constitute the experiment
117 */
118 private boolean fInitialized = false;
119
120 /**
121 * Lock for synchronization methods. These methods cannot be 'synchronized'
122 * since it makes it impossible to use an event request on the experiment
123 * during synchronization (the request thread would block)
124 */
125 private final Lock fSyncLock = new ReentrantLock();
126
127 // ------------------------------------------------------------------------
128 // Construction
129 // ------------------------------------------------------------------------
130
131 /**
132 * Default constructor. Should not be used directly, but is needed for
133 * extension points.
134 *
135 * @deprecated Do not call this directly (but do not remove it either!)
136 */
137 @Deprecated
138 public TmfExperiment() {
139 super();
140 }
141
142 /**
143 * Constructor of an experiment, taking the type, path, traces,
144 * indexPageSize and resource
145 *
146 * @param type
147 * The event type
148 * @param path
149 * The experiment path
150 * @param traces
151 * The experiment set of traces
152 * @param indexPageSize
153 * The experiment index page size. You can use
154 * {@link TmfExperiment#DEFAULT_INDEX_PAGE_SIZE} for a default
155 * value.
156 * @param resource
157 * The resource associated to the experiment. You can use 'null'
158 * for no resources (tests, etc.)
159 */
160 public TmfExperiment(final Class<? extends ITmfEvent> type,
161 final String path,
162 final ITmfTrace[] traces,
163 final int indexPageSize,
164 final @Nullable IResource resource) {
165 initExperiment(type, path, traces, indexPageSize, resource);
166 }
167
168 @Override
169 protected ITmfTraceIndexer createIndexer(int interval) {
170 if (getCheckpointSize() > 0) {
171 return new TmfBTreeTraceIndexer(this, interval);
172 }
173 return super.createIndexer(interval);
174 }
175
176 /**
177 * Clears the experiment
178 */
179 @Override
180 public synchronized void dispose() {
181
182 // Clean up the index if applicable
183 if (getIndexer() != null) {
184 getIndexer().dispose();
185 }
186
187 super.dispose();
188 }
189
190 // ------------------------------------------------------------------------
191 // ITmfTrace - Initializers
192 // ------------------------------------------------------------------------
193
194 @Override
195 public void initTrace(final IResource resource, final String path, final Class<? extends ITmfEvent> type) {
196 /* Do nothing for experiments */
197 }
198
199 /**
200 * Initialization of an experiment, taking the type, path, traces,
201 * indexPageSize and resource
202 *
203 * @param type
204 * the event type
205 * @param path
206 * the experiment path
207 * @param traces
208 * the experiment set of traces
209 * @param indexPageSize
210 * the experiment index page size
211 * @param resource
212 * the resource associated to the experiment
213 */
214 public void initExperiment(final Class<? extends ITmfEvent> type,
215 final String path,
216 final ITmfTrace[] traces,
217 final int indexPageSize,
218 final @Nullable IResource resource) {
219
220 setCacheSize(indexPageSize);
221 setStreamingInterval(0);
222
223 Multimap<String, ITmfTrace> tracesPerHost = HashMultimap.create();
224
225 // traces have to be set before super.initialize()
226 if (traces != null) {
227 // initialize
228 for (ITmfTrace trace : traces) {
229 if (trace != null) {
230 tracesPerHost.put(trace.getHostId(), trace);
231 addChild(trace);
232 }
233 }
234 }
235
236 try {
237 super.initialize(resource, path, type);
238 } catch (TmfTraceException e) {
239 Activator.logError("Error initializing experiment", e); //$NON-NLS-1$
240 }
241
242 if (resource != null) {
243 synchronizeTraces();
244 }
245
246 /*
247 * For all traces on the same host, if two or more specify different
248 * clock offsets, adjust their clock offset by the average of all of them.
249 *
250 * See https://bugs.eclipse.org/bugs/show_bug.cgi?id=484620
251 */
252 Function<ITmfPropertiesProvider, @Nullable Long> offsetGetter = trace -> {
253 String offset = trace.getProperties().get(CLOCK_OFFSET_PROPERTY);
254 if (offset == null) {
255 return null;
256 }
257 try {
258 return Long.parseLong(offset);
259 } catch (NumberFormatException e) {
260 return null;
261 }
262 };
263
264 for (String host : tracesPerHost.keySet()) {
265 /*
266 * Only attempt to synchronize traces that provide a clock_offset
267 * property.
268 */
269 Collection<ITmfPropertiesProvider> tracesToSynchronize = tracesPerHost.get(host).stream()
270 .filter(trace -> trace instanceof ITmfPropertiesProvider)
271 .map(trace -> (ITmfPropertiesProvider) trace)
272 .filter(trace -> offsetGetter.apply(trace) != null)
273 .collect(Collectors.toList());
274
275 if (tracesToSynchronize.size() < 2) {
276 continue;
277 }
278
279 /* Only synchronize traces if they haven't previously been synchronized */
280 if (tracesToSynchronize.stream()
281 .map(trace -> ((ITmfTrace) trace).getTimestampTransform())
282 .anyMatch(transform -> !transform.equals(TmfTimestampTransform.IDENTITY))) {
283 continue;
284 }
285
286 /* Calculate the average of all clock offsets */
287 BigInteger sum = BigInteger.ZERO;
288 for (ITmfPropertiesProvider trace : tracesToSynchronize) {
289 long offset = checkNotNull(offsetGetter.apply(trace));
290 sum = sum.add(BigInteger.valueOf(offset));
291 }
292 long average = sum.divide(BigInteger.valueOf(tracesToSynchronize.size())).longValue();
293
294 if (average > CLOCK_OFFSET_THRESHOLD_NS) {
295 Activator.logWarning("Average clock correction (" + average + ") is higher than threshold of " + //$NON-NLS-1$ //$NON-NLS-2$
296 CLOCK_OFFSET_THRESHOLD_NS + " ns for experiment " + this.toString()); //$NON-NLS-1$
297 }
298
299 /*
300 * Apply the offset correction to all identified traces, but only if
301 * they do not already have an equivalent one (for example, closing
302 * and re-opening the same experiment should not retrigger building
303 * all supplementary files).
304 */
305 tracesToSynchronize.forEach(t -> {
306 long offset = checkNotNull(offsetGetter.apply(t));
307 long delta = average - offset;
308
309 ITmfTrace trace = (ITmfTrace) t;
310 ITmfTimestampTransform currentTransform = trace.getTimestampTransform();
311 ITmfTimestampTransform newTransform = TimestampTransformFactory.createWithOffset(delta);
312
313 if (!newTransform.equals(currentTransform)) {
314 TmfTraceManager.deleteSupplementaryFiles(trace);
315 trace.setTimestampTransform(newTransform);
316 }
317 });
318 }
319 }
320
321 @Override
322 public IStatus validate(final IProject project, final String path) {
323 return Status.OK_STATUS;
324 }
325
326 // ------------------------------------------------------------------------
327 // Accessors
328 // ------------------------------------------------------------------------
329
330 /**
331 * Get the traces contained in this experiment.
332 *
333 * @return The array of contained traces
334 */
335 public List<@NonNull ITmfTrace> getTraces() {
336 return getChildren(ITmfTrace.class);
337 }
338
339 /**
340 * Returns the timestamp of the event at the requested index. If none,
341 * returns null.
342 *
343 * @param index
344 * the event index (rank)
345 * @return the corresponding event timestamp
346 */
347 public ITmfTimestamp getTimestamp(final int index) {
348 final ITmfContext context = seekEvent(index);
349 final ITmfEvent event = getNext(context);
350 context.dispose();
351 return (event != null) ? event.getTimestamp() : null;
352 }
353
354 // ------------------------------------------------------------------------
355 // Request management
356 // ------------------------------------------------------------------------
357
358 @Override
359 public synchronized ITmfContext armRequest(final ITmfEventRequest request) {
360
361 // Make sure we have something to read from
362 if (getChildren().isEmpty()) {
363 return null;
364 }
365
366 if (!TmfTimestamp.BIG_BANG.equals(request.getRange().getStartTime())
367 && request.getIndex() == 0) {
368 final ITmfContext context = seekEvent(request.getRange().getStartTime());
369 request.setStartIndex((int) context.getRank());
370 return context;
371
372 }
373
374 return seekEvent(request.getIndex());
375 }
376
377 // ------------------------------------------------------------------------
378 // ITmfTrace trace positioning
379 // ------------------------------------------------------------------------
380
381 @Override
382 public synchronized ITmfContext seekEvent(final ITmfLocation location) {
383 // Validate the location
384 if (location != null && !(location instanceof TmfExperimentLocation)) {
385 return null; // Throw an exception?
386 }
387
388 int length = getNbChildren();
389
390 // Initialize the location array if necessary
391 TmfLocationArray locationArray = ((location == null) ? new TmfLocationArray(length) : ((TmfExperimentLocation) location).getLocationInfo());
392
393 ITmfLocation[] locations = locationArray.getLocations();
394 long[] ranks = locationArray.getRanks();
395
396 // Create and populate the context's traces contexts
397 final TmfExperimentContext context = new TmfExperimentContext(length);
398
399 // Position the traces
400 long rank = 0;
401 for (int i = 0; i < length; i++) {
402 // Get the relevant trace attributes
403 final ITmfContext traceContext = ((ITmfTrace) getChild(i)).seekEvent(locations[i]);
404 context.setContext(i, traceContext);
405 traceContext.setRank(ranks[i]);
406 // update location after seek
407 locations[i] = traceContext.getLocation();
408 context.setEvent(i, ((ITmfTrace) getChild(i)).getNext(traceContext));
409 rank += ranks[i];
410 }
411
412 // Finalize context
413 context.setLocation(new TmfExperimentLocation(new TmfLocationArray(locations, ranks)));
414 context.setLastTrace(TmfExperimentContext.NO_TRACE);
415 context.setRank(rank);
416
417 return context;
418 }
419
420 // ------------------------------------------------------------------------
421 // ITmfTrace - SeekEvent operations (returning a trace context)
422 // ------------------------------------------------------------------------
423
424 @Override
425 public ITmfContext seekEvent(final double ratio) {
426 final ITmfContext context = seekEvent(Math.round(ratio * getNbEvents()));
427 return context;
428 }
429
430 @Override
431 public double getLocationRatio(final ITmfLocation location) {
432 if (location instanceof TmfExperimentLocation) {
433 long rank = 0;
434 TmfLocationArray locationArray = ((TmfExperimentLocation) location).getLocationInfo();
435 for (int i = 0; i < locationArray.size(); i++) {
436 rank += locationArray.getRank(i);
437 }
438 return (double) rank / getNbEvents();
439 }
440 return 0.0;
441 }
442
443 @Override
444 public ITmfLocation getCurrentLocation() {
445 // never used
446 return null;
447 }
448
449 // ------------------------------------------------------------------------
450 // ITmfTrace trace positioning
451 // ------------------------------------------------------------------------
452
453 @Override
454 public synchronized ITmfEvent parseEvent(final ITmfContext context) {
455 final ITmfContext tmpContext = seekEvent(context.getLocation());
456 final ITmfEvent event = getNext(tmpContext);
457 return event;
458 }
459
460 @Override
461 public synchronized ITmfEvent getNext(ITmfContext context) {
462
463 // Validate the context
464 if (!(context instanceof TmfExperimentContext)) {
465 return null; // Throw an exception?
466 }
467
468 int length = getNbChildren();
469
470 // Make sure that we have something to read from
471 if (length == 0) {
472 return null;
473 }
474
475 TmfExperimentContext expContext = (TmfExperimentContext) context;
476
477 // If an event was consumed previously, first get the next one from that
478 // trace
479 final int lastTrace = expContext.getLastTrace();
480 if (lastTrace != TmfExperimentContext.NO_TRACE) {
481 final ITmfContext traceContext = expContext.getContext(lastTrace);
482 expContext.setEvent(lastTrace, ((ITmfTrace) getChild(lastTrace)).getNext(traceContext));
483 expContext.setLastTrace(TmfExperimentContext.NO_TRACE);
484 }
485
486 // Scan the candidate events and identify the "next" trace to read from
487 int trace = TmfExperimentContext.NO_TRACE;
488 ITmfTimestamp timestamp = TmfTimestamp.BIG_CRUNCH;
489 for (int i = 0; i < length; i++) {
490 final ITmfEvent event = expContext.getEvent(i);
491
492 if (event != null) {
493 final ITmfTimestamp otherTS = event.getTimestamp();
494 if (otherTS.compareTo(timestamp) < 0) {
495 trace = i;
496 timestamp = otherTS;
497 }
498 }
499 }
500
501 ITmfEvent event = null;
502 if (trace != TmfExperimentContext.NO_TRACE) {
503 event = expContext.getEvent(trace);
504 if (event != null) {
505 updateAttributes(expContext, event);
506 expContext.increaseRank();
507 expContext.setLastTrace(trace);
508 final ITmfContext traceContext = expContext.getContext(trace);
509 if (traceContext == null) {
510 throw new IllegalStateException();
511 }
512
513 // Update the experiment location
514 ITmfLocation location = expContext.getLocation();
515 if (location instanceof TmfExperimentLocation) {
516 TmfLocationArray locationArray = new TmfLocationArray(
517 ((TmfExperimentLocation) location).getLocationInfo(),
518 trace, traceContext.getLocation(), traceContext.getRank());
519 expContext.setLocation(new TmfExperimentLocation(locationArray));
520 }
521 }
522 }
523
524 return event;
525 }
526
527 @Override
528 public ITmfTimestamp getInitialRangeOffset() {
529
530 List<ITmfTrace> children = getChildren(ITmfTrace.class);
531
532 if (children.isEmpty()) {
533 return super.getInitialRangeOffset();
534 }
535
536 ITmfTimestamp initTs = TmfTimestamp.BIG_CRUNCH;
537 for (ITmfTrace trace : children) {
538 ITmfTimestamp ts = (trace).getInitialRangeOffset();
539 if (ts.compareTo(initTs) < 0) {
540 initTs = ts;
541 }
542 }
543 return initTs;
544 }
545
546 /**
547 * Get the path to the folder in the supplementary file where
548 * synchronization-related data can be kept so they are not deleted when the
549 * experiment is synchronized. Analysis involved in synchronization can put
550 * their supplementary files in there so they are preserved after
551 * synchronization.
552 *
553 * If the directory does not exist, it will be created. A return value of
554 * <code>null</code> means either the trace resource does not exist or
555 * supplementary resources cannot be kept.
556 *
557 * @param absolute
558 * If <code>true</code>, it returns the absolute path in the file
559 * system, including the supplementary file path. Otherwise, it
560 * returns only the directory name.
561 * @return The path to the folder where synchronization-related
562 * supplementary files can be kept or <code>null</code> if not
563 * available.
564 */
565 public String getSynchronizationFolder(boolean absolute) {
566 /* Set up the path to the synchronization file we'll use */
567 IResource resource = this.getResource();
568 String syncDirectory = null;
569
570 try {
571 /* get the directory where the file will be stored. */
572 if (resource != null) {
573 String fullDirectory = resource.getPersistentProperty(TmfCommonConstants.TRACE_SUPPLEMENTARY_FOLDER);
574 /* Create the synchronization data directory if not present */
575 if (fullDirectory != null) {
576 fullDirectory = fullDirectory + File.separator + SYNCHRONIZATION_DIRECTORY;
577 File syncDir = new File(fullDirectory);
578 syncDir.mkdirs();
579 }
580 if (absolute) {
581 syncDirectory = fullDirectory;
582 } else {
583 syncDirectory = SYNCHRONIZATION_DIRECTORY;
584 }
585 }
586 } catch (CoreException e) {
587 return null;
588 }
589
590 return syncDirectory;
591 }
592
593 /**
594 * Synchronizes the traces of an experiment. By default it only tries to
595 * read a synchronization file if it exists
596 *
597 * @return The synchronization object
598 */
599 public SynchronizationAlgorithm synchronizeTraces() {
600 return synchronizeTraces(false);
601 }
602
603 /**
604 * Synchronizes the traces of an experiment.
605 *
606 * @param doSync
607 * Whether to actually synchronize or just try opening a sync
608 * file
609 * @return The synchronization object
610 */
611 public SynchronizationAlgorithm synchronizeTraces(boolean doSync) {
612 fSyncLock.lock();
613
614 try {
615 String syncDirectory = getSynchronizationFolder(true);
616
617 final File syncFile = (syncDirectory != null) ? new File(syncDirectory + File.separator + getSynchronizationFolder(true)) : null;
618
619 final SynchronizationAlgorithm syncAlgo = SynchronizationManager.synchronizeTraces(syncFile, Collections.singleton(this), doSync);
620
621 final TmfTraceSynchronizedSignal signal = new TmfTraceSynchronizedSignal(this, syncAlgo);
622
623 /* Broadcast in separate thread to prevent deadlock */
624 new Thread() {
625 @Override
626 public void run() {
627 broadcast(signal);
628 }
629 }.start();
630
631 return syncAlgo;
632 } finally {
633 fSyncLock.unlock();
634 }
635 }
636
637 @Override
638 @SuppressWarnings("nls")
639 public synchronized String toString() {
640 return "[TmfExperiment (" + getName() + ")]";
641 }
642
643 // ------------------------------------------------------------------------
644 // Streaming support
645 // ------------------------------------------------------------------------
646
647 private synchronized void initializeStreamingMonitor() {
648
649 if (fInitialized) {
650 return;
651 }
652 fInitialized = true;
653
654 if (getStreamingInterval() == 0) {
655 final ITmfContext context = seekEvent(0);
656 final ITmfEvent event = getNext(context);
657 context.dispose();
658 if (event == null) {
659 return;
660 }
661 final TmfTimeRange timeRange = new TmfTimeRange(event.getTimestamp(), TmfTimestamp.BIG_CRUNCH);
662 final TmfTraceRangeUpdatedSignal signal = new TmfTraceRangeUpdatedSignal(this, this, timeRange);
663
664 // Broadcast in separate thread to prevent deadlock
665 new Thread() {
666 @Override
667 public void run() {
668 broadcast(signal);
669 }
670 }.start();
671 return;
672 }
673
674 final Thread thread = new Thread("Streaming Monitor for experiment " + getName()) { //$NON-NLS-1$
675 private ITmfTimestamp safeTimestamp = null;
676 private ITmfTimestamp lastSafeTimestamp = null;
677 private TmfTimeRange timeRange = null;
678
679 @Override
680 public void run() {
681 while (!executorIsShutdown()) {
682 if (!getIndexer().isIndexing()) {
683 ITmfTimestamp startTimestamp = TmfTimestamp.BIG_CRUNCH;
684 ITmfTimestamp endTimestamp = TmfTimestamp.BIG_BANG;
685
686 for (final ITmfTrace trace : getChildren(ITmfTrace.class)) {
687 if (trace.getStartTime().compareTo(startTimestamp) < 0) {
688 startTimestamp = trace.getStartTime();
689 }
690 if (trace.getStreamingInterval() != 0 && trace.getEndTime().compareTo(endTimestamp) > 0) {
691 endTimestamp = trace.getEndTime();
692 }
693 }
694 ITmfTimestamp safeTs = safeTimestamp;
695 if (safeTs != null && (lastSafeTimestamp == null || safeTs.compareTo(lastSafeTimestamp) > 0)) {
696 timeRange = new TmfTimeRange(startTimestamp, safeTs);
697 lastSafeTimestamp = safeTs;
698 } else {
699 timeRange = null;
700 }
701 safeTimestamp = endTimestamp;
702 if (timeRange != null) {
703 final TmfTraceRangeUpdatedSignal signal = new TmfTraceRangeUpdatedSignal(TmfExperiment.this, TmfExperiment.this, timeRange);
704 broadcast(signal);
705 }
706 }
707 try {
708 Thread.sleep(getStreamingInterval());
709 } catch (final InterruptedException e) {
710 e.printStackTrace();
711 }
712 }
713 }
714 };
715 thread.start();
716 }
717
718 @Override
719 public long getStreamingInterval() {
720 long interval = 0;
721 for (final ITmfTrace trace : getChildren(ITmfTrace.class)) {
722 interval = Math.max(interval, trace.getStreamingInterval());
723 }
724 return interval;
725 }
726
727 // ------------------------------------------------------------------------
728 // Signal handlers
729 // ------------------------------------------------------------------------
730
731 @Override
732 @TmfSignalHandler
733 public void traceOpened(TmfTraceOpenedSignal signal) {
734 if (signal.getTrace() == this) {
735 initializeStreamingMonitor();
736
737 /* Initialize the analysis */
738 MultiStatus status = new MultiStatus(Activator.PLUGIN_ID, IStatus.OK, null, null);
739 status.add(executeAnalysis());
740 if (!status.isOK()) {
741 Activator.log(status);
742 }
743 TmfTraceManager.refreshSupplementaryFiles(this);
744 }
745 }
746
747 @Override
748 public synchronized int getCheckpointSize() {
749 int totalCheckpointSize = 0;
750 try {
751 List<ITmfTrace> children = getChildren(ITmfTrace.class);
752 for (ITmfTrace trace : children) {
753 if (!(trace instanceof ITmfPersistentlyIndexable)) {
754 return 0;
755 }
756
757 ITmfPersistentlyIndexable persistableIndexTrace = (ITmfPersistentlyIndexable) trace;
758 int currentTraceCheckpointSize = persistableIndexTrace.getCheckpointSize();
759 if (currentTraceCheckpointSize <= 0) {
760 return 0;
761 }
762 totalCheckpointSize += currentTraceCheckpointSize;
763 // each entry in the TmfLocationArray has a rank in addition
764 // of the location
765 totalCheckpointSize += 8;
766 }
767 } catch (UnsupportedOperationException e) {
768 return 0;
769 }
770
771 return totalCheckpointSize;
772 }
773
774 @Override
775 public ITmfLocation restoreLocation(ByteBuffer bufferIn) {
776 List<ITmfTrace> children = getChildren(ITmfTrace.class);
777 int length = children.size();
778 ITmfLocation[] locations = new ITmfLocation[length];
779 long[] ranks = new long[length];
780 for (int i = 0; i < length; ++i) {
781 final ITmfTrace trace = children.get(i);
782 locations[i] = ((ITmfPersistentlyIndexable) trace).restoreLocation(bufferIn);
783 ranks[i] = bufferIn.getLong();
784 }
785 TmfLocationArray arr = new TmfLocationArray(locations, ranks);
786 TmfExperimentLocation l = new TmfExperimentLocation(arr);
787 return l;
788 }
789 }
This page took 0.055706 seconds and 6 git commands to generate.