2e1d30dded3c35c40941c9bb6117a5999ff35651
[deliverable/tracecompass.git] / tmf / org.eclipse.tracecompass.tmf.core / src / org / eclipse / tracecompass / tmf / core / trace / experiment / TmfExperiment.java
1 /*******************************************************************************
2 * Copyright (c) 2009, 2015 Ericsson, École Polytechnique de Montréal
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 * Francois Chouinard - Updated as per TMF Trace Model 1.0
12 * Patrick Tasse - Updated for removal of context clone
13 * Patrick Tasse - Updated for ranks in experiment location
14 * Geneviève Bastien - Added support of experiment synchronization
15 * Added the initExperiment method and default constructor
16 * Bernd Hufmann - Updated for added interfaces to ITmfEventProvider
17 *******************************************************************************/
18
19 package org.eclipse.tracecompass.tmf.core.trace.experiment;
20
21 import static org.eclipse.tracecompass.common.core.NonNullUtils.checkNotNull;
22
23 import java.io.File;
24 import java.math.BigInteger;
25 import java.nio.ByteBuffer;
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.List;
29 import java.util.concurrent.locks.Lock;
30 import java.util.concurrent.locks.ReentrantLock;
31 import java.util.function.Function;
32 import java.util.stream.Collectors;
33
34 import org.eclipse.core.resources.IProject;
35 import org.eclipse.core.resources.IResource;
36 import org.eclipse.core.runtime.CoreException;
37 import org.eclipse.core.runtime.IStatus;
38 import org.eclipse.core.runtime.MultiStatus;
39 import org.eclipse.core.runtime.Status;
40 import org.eclipse.jdt.annotation.NonNull;
41 import org.eclipse.jdt.annotation.Nullable;
42 import org.eclipse.tracecompass.internal.tmf.core.Activator;
43 import org.eclipse.tracecompass.internal.tmf.core.synchronization.TmfTimestampTransform;
44 import org.eclipse.tracecompass.internal.tmf.core.trace.experiment.TmfExperimentContext;
45 import org.eclipse.tracecompass.internal.tmf.core.trace.experiment.TmfExperimentLocation;
46 import org.eclipse.tracecompass.internal.tmf.core.trace.experiment.TmfLocationArray;
47 import org.eclipse.tracecompass.tmf.core.TmfCommonConstants;
48 import org.eclipse.tracecompass.tmf.core.event.ITmfEvent;
49 import org.eclipse.tracecompass.tmf.core.exceptions.TmfTraceException;
50 import org.eclipse.tracecompass.tmf.core.project.model.ITmfPropertiesProvider;
51 import org.eclipse.tracecompass.tmf.core.request.ITmfEventRequest;
52 import org.eclipse.tracecompass.tmf.core.signal.TmfSignalHandler;
53 import org.eclipse.tracecompass.tmf.core.signal.TmfTraceOpenedSignal;
54 import org.eclipse.tracecompass.tmf.core.signal.TmfTraceRangeUpdatedSignal;
55 import org.eclipse.tracecompass.tmf.core.signal.TmfTraceSynchronizedSignal;
56 import org.eclipse.tracecompass.tmf.core.synchronization.ITmfTimestampTransform;
57 import org.eclipse.tracecompass.tmf.core.synchronization.SynchronizationAlgorithm;
58 import org.eclipse.tracecompass.tmf.core.synchronization.SynchronizationManager;
59 import org.eclipse.tracecompass.tmf.core.synchronization.TimestampTransformFactory;
60 import org.eclipse.tracecompass.tmf.core.timestamp.ITmfTimestamp;
61 import org.eclipse.tracecompass.tmf.core.timestamp.TmfTimeRange;
62 import org.eclipse.tracecompass.tmf.core.timestamp.TmfTimestamp;
63 import org.eclipse.tracecompass.tmf.core.trace.ITmfContext;
64 import org.eclipse.tracecompass.tmf.core.trace.ITmfTrace;
65 import org.eclipse.tracecompass.tmf.core.trace.TmfTrace;
66 import org.eclipse.tracecompass.tmf.core.trace.TmfTraceManager;
67 import org.eclipse.tracecompass.tmf.core.trace.indexer.ITmfPersistentlyIndexable;
68 import org.eclipse.tracecompass.tmf.core.trace.indexer.ITmfTraceIndexer;
69 import org.eclipse.tracecompass.tmf.core.trace.indexer.TmfBTreeTraceIndexer;
70 import org.eclipse.tracecompass.tmf.core.trace.location.ITmfLocation;
71
72 import com.google.common.collect.HashMultimap;
73 import com.google.common.collect.Multimap;
74
75 /**
76 * TmfExperiment presents a time-ordered, unified view of a set of ITmfTrace:s
77 * that are part of a tracing experiment.
78 *
79 * @version 1.0
80 * @author Francois Chouinard
81 */
82 public class TmfExperiment extends TmfTrace implements ITmfPersistentlyIndexable {
83
84 // ------------------------------------------------------------------------
85 // Constants
86 // ------------------------------------------------------------------------
87
88 /**
89 * The file name of the Synchronization
90 */
91 private static final String SYNCHRONIZATION_FILE_NAME = "synchronization.bin"; //$NON-NLS-1$
92
93 /**
94 * The name of the directory containing trace synchronization data. This
95 * directory typically will be preserved when traces are synchronized.
96 * Analysis involved in synchronization can put their supplementary files in
97 * there so they are not deleted when synchronized traces are copied.
98 */
99 private static final String SYNCHRONIZATION_DIRECTORY = "sync_data"; //$NON-NLS-1$
100
101 /**
102 * The default index page size
103 */
104 public static final int DEFAULT_INDEX_PAGE_SIZE = 5000;
105
106 /**
107 * Property name for traces defining a clock offset.
108 */
109 private static final String CLOCK_OFFSET_PROPERTY = "clock_offset"; //$NON-NLS-1$
110
111 /**
112 * If the automatic clock offset is higher than this value, emit a warning.
113 */
114 private static final long CLOCK_OFFSET_THRESHOLD_NS = 500000;
115
116 // ------------------------------------------------------------------------
117 // Attributes
118 // ------------------------------------------------------------------------
119
120 /**
121 * The set of traces that constitute the experiment
122 */
123 private boolean fInitialized = false;
124
125 /**
126 * Lock for synchronization methods. These methods cannot be 'synchronized'
127 * since it makes it impossible to use an event request on the experiment
128 * during synchronization (the request thread would block)
129 */
130 private final Lock fSyncLock = new ReentrantLock();
131
132 // ------------------------------------------------------------------------
133 // Construction
134 // ------------------------------------------------------------------------
135
136 /**
137 * Default constructor. Should not be used directly, but is needed for
138 * extension points.
139 *
140 * @deprecated Do not call this directly (but do not remove it either!)
141 */
142 @Deprecated
143 public TmfExperiment() {
144 super();
145 }
146
147 /**
148 * Constructor of an experiment, taking the type, path, traces,
149 * indexPageSize and resource
150 *
151 * @param type
152 * The event type
153 * @param path
154 * The experiment path
155 * @param traces
156 * The experiment set of traces
157 * @param indexPageSize
158 * The experiment index page size. You can use
159 * {@link TmfExperiment#DEFAULT_INDEX_PAGE_SIZE} for a default
160 * value.
161 * @param resource
162 * The resource associated to the experiment. You can use 'null'
163 * for no resources (tests, etc.)
164 */
165 public TmfExperiment(final Class<? extends ITmfEvent> type,
166 final String path,
167 final ITmfTrace[] traces,
168 final int indexPageSize,
169 final @Nullable IResource resource) {
170 initExperiment(type, path, traces, indexPageSize, resource);
171 }
172
173 @Override
174 protected ITmfTraceIndexer createIndexer(int interval) {
175 if (getCheckpointSize() > 0) {
176 return new TmfBTreeTraceIndexer(this, interval);
177 }
178 return super.createIndexer(interval);
179 }
180
181 /**
182 * Clears the experiment
183 */
184 @Override
185 public synchronized void dispose() {
186
187 // Clean up the index if applicable
188 if (getIndexer() != null) {
189 getIndexer().dispose();
190 }
191
192 super.dispose();
193 }
194
195 // ------------------------------------------------------------------------
196 // ITmfTrace - Initializers
197 // ------------------------------------------------------------------------
198
199 @Override
200 public void initTrace(final IResource resource, final String path, final Class<? extends ITmfEvent> type) {
201 /* Do nothing for experiments */
202 }
203
204 /**
205 * Initialization of an experiment, taking the type, path, traces,
206 * indexPageSize and resource
207 *
208 * @param type
209 * the event type
210 * @param path
211 * the experiment path
212 * @param traces
213 * the experiment set of traces
214 * @param indexPageSize
215 * the experiment index page size
216 * @param resource
217 * the resource associated to the experiment
218 */
219 public void initExperiment(final Class<? extends ITmfEvent> type,
220 final String path,
221 final ITmfTrace[] traces,
222 final int indexPageSize,
223 final @Nullable IResource resource) {
224
225 setCacheSize(indexPageSize);
226 setStreamingInterval(0);
227
228 Multimap<String, ITmfTrace> tracesPerHost = HashMultimap.create();
229
230 // traces have to be set before super.initialize()
231 if (traces != null) {
232 // initialize
233 for (ITmfTrace trace : traces) {
234 if (trace != null) {
235 tracesPerHost.put(trace.getHostId(), trace);
236 addChild(trace);
237 }
238 }
239 }
240
241 try {
242 super.initialize(resource, path, type);
243 } catch (TmfTraceException e) {
244 Activator.logError("Error initializing experiment", e); //$NON-NLS-1$
245 }
246
247 if (resource != null) {
248 synchronizeTraces();
249 }
250
251 /*
252 * For all traces on the same host, if two or more specify different
253 * clock offsets, adjust their clock offset by the average of all of them.
254 *
255 * See https://bugs.eclipse.org/bugs/show_bug.cgi?id=484620
256 */
257 Function<ITmfPropertiesProvider, @Nullable Long> offsetGetter = trace -> {
258 String offset = trace.getProperties().get(CLOCK_OFFSET_PROPERTY);
259 if (offset == null) {
260 return null;
261 }
262 try {
263 return Long.parseLong(offset);
264 } catch (NumberFormatException e) {
265 return null;
266 }
267 };
268
269 for (String host : tracesPerHost.keySet()) {
270 /*
271 * Only attempt to synchronize traces that provide a clock_offset
272 * property.
273 */
274 Collection<ITmfPropertiesProvider> tracesToSynchronize = tracesPerHost.get(host).stream()
275 .filter(trace -> trace instanceof ITmfPropertiesProvider)
276 .map(trace -> (ITmfPropertiesProvider) trace)
277 .filter(trace -> offsetGetter.apply(trace) != null)
278 .collect(Collectors.toList());
279
280 if (tracesToSynchronize.size() < 2) {
281 continue;
282 }
283
284 /* Only synchronize traces if they haven't previously been synchronized */
285 if (tracesToSynchronize.stream()
286 .map(trace -> ((ITmfTrace) trace).getTimestampTransform())
287 .anyMatch(transform -> !transform.equals(TmfTimestampTransform.IDENTITY))) {
288 continue;
289 }
290
291 /* Calculate the average of all clock offsets */
292 BigInteger sum = BigInteger.ZERO;
293 for (ITmfPropertiesProvider trace : tracesToSynchronize) {
294 long offset = checkNotNull(offsetGetter.apply(trace));
295 sum = sum.add(BigInteger.valueOf(offset));
296 }
297 long average = sum.divide(BigInteger.valueOf(tracesToSynchronize.size())).longValue();
298
299 if (average > CLOCK_OFFSET_THRESHOLD_NS) {
300 Activator.logWarning("Average clock correction (" + average + ") is higher than threshold of " + //$NON-NLS-1$ //$NON-NLS-2$
301 CLOCK_OFFSET_THRESHOLD_NS + " ns for experiment " + this.toString()); //$NON-NLS-1$
302 }
303
304 /*
305 * Apply the offset correction to all identified traces, but only if
306 * they do not already have an equivalent one (for example, closing
307 * and re-opening the same experiment should not retrigger building
308 * all supplementary files).
309 */
310 tracesToSynchronize.forEach(t -> {
311 long offset = checkNotNull(offsetGetter.apply(t));
312 long delta = average - offset;
313
314 ITmfTrace trace = (ITmfTrace) t;
315 ITmfTimestampTransform currentTransform = trace.getTimestampTransform();
316 ITmfTimestampTransform newTransform = TimestampTransformFactory.createWithOffset(delta);
317
318 if (!newTransform.equals(currentTransform)) {
319 TmfTraceManager.deleteSupplementaryFiles(trace);
320 trace.setTimestampTransform(newTransform);
321 }
322 });
323 }
324 }
325
326 @Override
327 public IStatus validate(final IProject project, final String path) {
328 return Status.OK_STATUS;
329 }
330
331 // ------------------------------------------------------------------------
332 // Accessors
333 // ------------------------------------------------------------------------
334
335 /**
336 * Get the traces contained in this experiment.
337 *
338 * @return The array of contained traces
339 */
340 public List<@NonNull ITmfTrace> getTraces() {
341 return getChildren(ITmfTrace.class);
342 }
343
344 /**
345 * Returns the timestamp of the event at the requested index. If none,
346 * returns null.
347 *
348 * @param index
349 * the event index (rank)
350 * @return the corresponding event timestamp
351 */
352 public ITmfTimestamp getTimestamp(final int index) {
353 final ITmfContext context = seekEvent(index);
354 final ITmfEvent event = getNext(context);
355 context.dispose();
356 return (event != null) ? event.getTimestamp() : null;
357 }
358
359 // ------------------------------------------------------------------------
360 // Request management
361 // ------------------------------------------------------------------------
362
363 @Override
364 public synchronized ITmfContext armRequest(final ITmfEventRequest request) {
365
366 // Make sure we have something to read from
367 if (getChildren().isEmpty()) {
368 return null;
369 }
370
371 if (!TmfTimestamp.BIG_BANG.equals(request.getRange().getStartTime())
372 && request.getIndex() == 0) {
373 final ITmfContext context = seekEvent(request.getRange().getStartTime());
374 request.setStartIndex((int) context.getRank());
375 return context;
376
377 }
378
379 return seekEvent(request.getIndex());
380 }
381
382 // ------------------------------------------------------------------------
383 // ITmfTrace trace positioning
384 // ------------------------------------------------------------------------
385
386 @Override
387 public synchronized ITmfContext seekEvent(final ITmfLocation location) {
388 // Validate the location
389 if (location != null && !(location instanceof TmfExperimentLocation)) {
390 return null; // Throw an exception?
391 }
392
393 int length = getNbChildren();
394
395 // Initialize the location array if necessary
396 TmfLocationArray locationArray = ((location == null) ? new TmfLocationArray(length) : ((TmfExperimentLocation) location).getLocationInfo());
397
398 ITmfLocation[] locations = locationArray.getLocations();
399 long[] ranks = locationArray.getRanks();
400
401 // Create and populate the context's traces contexts
402 final TmfExperimentContext context = new TmfExperimentContext(length);
403
404 // Position the traces
405 long rank = 0;
406 for (int i = 0; i < length; i++) {
407 // Get the relevant trace attributes
408 final ITmfContext traceContext = ((ITmfTrace) getChild(i)).seekEvent(locations[i]);
409 context.setContext(i, traceContext);
410 traceContext.setRank(ranks[i]);
411 // update location after seek
412 locations[i] = traceContext.getLocation();
413 context.setEvent(i, ((ITmfTrace) getChild(i)).getNext(traceContext));
414 rank += ranks[i];
415 }
416
417 // Finalize context
418 context.setLocation(new TmfExperimentLocation(new TmfLocationArray(locations, ranks)));
419 context.setLastTrace(TmfExperimentContext.NO_TRACE);
420 context.setRank(rank);
421
422 return context;
423 }
424
425 // ------------------------------------------------------------------------
426 // ITmfTrace - SeekEvent operations (returning a trace context)
427 // ------------------------------------------------------------------------
428
429 @Override
430 public ITmfContext seekEvent(final double ratio) {
431 final ITmfContext context = seekEvent(Math.round(ratio * getNbEvents()));
432 return context;
433 }
434
435 @Override
436 public double getLocationRatio(final ITmfLocation location) {
437 if (location instanceof TmfExperimentLocation) {
438 long rank = 0;
439 TmfLocationArray locationArray = ((TmfExperimentLocation) location).getLocationInfo();
440 for (int i = 0; i < locationArray.size(); i++) {
441 rank += locationArray.getRank(i);
442 }
443 return (double) rank / getNbEvents();
444 }
445 return 0.0;
446 }
447
448 @Override
449 public ITmfLocation getCurrentLocation() {
450 // never used
451 return null;
452 }
453
454 // ------------------------------------------------------------------------
455 // ITmfTrace trace positioning
456 // ------------------------------------------------------------------------
457
458 @Override
459 public synchronized ITmfEvent parseEvent(final ITmfContext context) {
460 final ITmfContext tmpContext = seekEvent(context.getLocation());
461 final ITmfEvent event = getNext(tmpContext);
462 return event;
463 }
464
465 @Override
466 public synchronized ITmfEvent getNext(ITmfContext context) {
467
468 // Validate the context
469 if (!(context instanceof TmfExperimentContext)) {
470 return null; // Throw an exception?
471 }
472
473 int length = getNbChildren();
474
475 // Make sure that we have something to read from
476 if (length == 0) {
477 return null;
478 }
479
480 TmfExperimentContext expContext = (TmfExperimentContext) context;
481
482 // If an event was consumed previously, first get the next one from that
483 // trace
484 final int lastTrace = expContext.getLastTrace();
485 if (lastTrace != TmfExperimentContext.NO_TRACE) {
486 final ITmfContext traceContext = expContext.getContext(lastTrace);
487 expContext.setEvent(lastTrace, ((ITmfTrace) getChild(lastTrace)).getNext(traceContext));
488 expContext.setLastTrace(TmfExperimentContext.NO_TRACE);
489 }
490
491 // Scan the candidate events and identify the "next" trace to read from
492 int trace = TmfExperimentContext.NO_TRACE;
493 ITmfTimestamp timestamp = TmfTimestamp.BIG_CRUNCH;
494 for (int i = 0; i < length; i++) {
495 final ITmfEvent event = expContext.getEvent(i);
496
497 if (event != null) {
498 final ITmfTimestamp otherTS = event.getTimestamp();
499 if (otherTS.compareTo(timestamp) < 0) {
500 trace = i;
501 timestamp = otherTS;
502 }
503 }
504 }
505
506 ITmfEvent event = null;
507 if (trace != TmfExperimentContext.NO_TRACE) {
508 event = expContext.getEvent(trace);
509 if (event != null) {
510 updateAttributes(expContext, event);
511 expContext.increaseRank();
512 expContext.setLastTrace(trace);
513 final ITmfContext traceContext = expContext.getContext(trace);
514 if (traceContext == null) {
515 throw new IllegalStateException();
516 }
517
518 // Update the experiment location
519 ITmfLocation location = expContext.getLocation();
520 if (location instanceof TmfExperimentLocation) {
521 TmfLocationArray locationArray = new TmfLocationArray(
522 ((TmfExperimentLocation) location).getLocationInfo(),
523 trace, traceContext.getLocation(), traceContext.getRank());
524 expContext.setLocation(new TmfExperimentLocation(locationArray));
525 }
526 }
527 }
528
529 return event;
530 }
531
532 @Override
533 public ITmfTimestamp getInitialRangeOffset() {
534
535 List<ITmfTrace> children = getChildren(ITmfTrace.class);
536
537 if (children.isEmpty()) {
538 return super.getInitialRangeOffset();
539 }
540
541 ITmfTimestamp initTs = TmfTimestamp.BIG_CRUNCH;
542 for (ITmfTrace trace : children) {
543 ITmfTimestamp ts = (trace).getInitialRangeOffset();
544 if (ts.compareTo(initTs) < 0) {
545 initTs = ts;
546 }
547 }
548 return initTs;
549 }
550
551 /**
552 * Get the path to the folder in the supplementary file where
553 * synchronization-related data can be kept so they are not deleted when the
554 * experiment is synchronized. Analysis involved in synchronization can put
555 * their supplementary files in there so they are preserved after
556 * synchronization.
557 *
558 * If the directory does not exist, it will be created. A return value of
559 * <code>null</code> means either the trace resource does not exist or
560 * supplementary resources cannot be kept.
561 *
562 * @param absolute
563 * If <code>true</code>, it returns the absolute path in the file
564 * system, including the supplementary file path. Otherwise, it
565 * returns only the directory name.
566 * @return The path to the folder where synchronization-related
567 * supplementary files can be kept or <code>null</code> if not
568 * available.
569 */
570 public String getSynchronizationFolder(boolean absolute) {
571 /* Set up the path to the synchronization file we'll use */
572 IResource resource = this.getResource();
573 String syncDirectory = null;
574
575 try {
576 /* get the directory where the file will be stored. */
577 if (resource != null) {
578 String fullDirectory = resource.getPersistentProperty(TmfCommonConstants.TRACE_SUPPLEMENTARY_FOLDER);
579 /* Create the synchronization data directory if not present */
580 if (fullDirectory != null) {
581 fullDirectory = fullDirectory + File.separator + SYNCHRONIZATION_DIRECTORY;
582 File syncDir = new File(fullDirectory);
583 syncDir.mkdirs();
584 }
585 if (absolute) {
586 syncDirectory = fullDirectory;
587 } else {
588 syncDirectory = SYNCHRONIZATION_DIRECTORY;
589 }
590 }
591 } catch (CoreException e) {
592 return null;
593 }
594
595 return syncDirectory;
596 }
597
598 /**
599 * Synchronizes the traces of an experiment. By default it only tries to
600 * read a synchronization file if it exists
601 *
602 * @return The synchronization object
603 */
604 public SynchronizationAlgorithm synchronizeTraces() {
605 return synchronizeTraces(false);
606 }
607
608 /**
609 * Synchronizes the traces of an experiment.
610 *
611 * @param doSync
612 * Whether to actually synchronize or just try opening a sync
613 * file
614 * @return The synchronization object
615 */
616 public SynchronizationAlgorithm synchronizeTraces(boolean doSync) {
617 fSyncLock.lock();
618
619 try {
620 String syncDirectory = getSynchronizationFolder(true);
621
622 final File syncFile = (syncDirectory != null) ? new File(syncDirectory + File.separator + SYNCHRONIZATION_FILE_NAME) : null;
623
624 final SynchronizationAlgorithm syncAlgo = SynchronizationManager.synchronizeTraces(syncFile, Collections.singleton(this), doSync);
625
626 final TmfTraceSynchronizedSignal signal = new TmfTraceSynchronizedSignal(this, syncAlgo);
627
628 /* Broadcast in separate thread to prevent deadlock */
629 new Thread() {
630 @Override
631 public void run() {
632 broadcast(signal);
633 }
634 }.start();
635
636 return syncAlgo;
637 } finally {
638 fSyncLock.unlock();
639 }
640 }
641
642 @Override
643 @SuppressWarnings("nls")
644 public synchronized String toString() {
645 return "[TmfExperiment (" + getName() + ")]";
646 }
647
648 // ------------------------------------------------------------------------
649 // Streaming support
650 // ------------------------------------------------------------------------
651
652 private synchronized void initializeStreamingMonitor() {
653
654 if (fInitialized) {
655 return;
656 }
657 fInitialized = true;
658
659 if (getStreamingInterval() == 0) {
660 final ITmfContext context = seekEvent(0);
661 final ITmfEvent event = getNext(context);
662 context.dispose();
663 if (event == null) {
664 return;
665 }
666 final TmfTimeRange timeRange = new TmfTimeRange(event.getTimestamp(), TmfTimestamp.BIG_CRUNCH);
667 final TmfTraceRangeUpdatedSignal signal = new TmfTraceRangeUpdatedSignal(this, this, timeRange);
668
669 // Broadcast in separate thread to prevent deadlock
670 new Thread() {
671 @Override
672 public void run() {
673 broadcast(signal);
674 }
675 }.start();
676 return;
677 }
678
679 final Thread thread = new Thread("Streaming Monitor for experiment " + getName()) { //$NON-NLS-1$
680 private ITmfTimestamp safeTimestamp = null;
681 private ITmfTimestamp lastSafeTimestamp = null;
682 private TmfTimeRange timeRange = null;
683
684 @Override
685 public void run() {
686 while (!executorIsShutdown()) {
687 if (!getIndexer().isIndexing()) {
688 ITmfTimestamp startTimestamp = TmfTimestamp.BIG_CRUNCH;
689 ITmfTimestamp endTimestamp = TmfTimestamp.BIG_BANG;
690
691 for (final ITmfTrace trace : getChildren(ITmfTrace.class)) {
692 if (trace.getStartTime().compareTo(startTimestamp) < 0) {
693 startTimestamp = trace.getStartTime();
694 }
695 if (trace.getStreamingInterval() != 0 && trace.getEndTime().compareTo(endTimestamp) > 0) {
696 endTimestamp = trace.getEndTime();
697 }
698 }
699 ITmfTimestamp safeTs = safeTimestamp;
700 if (safeTs != null && (lastSafeTimestamp == null || safeTs.compareTo(lastSafeTimestamp) > 0)) {
701 timeRange = new TmfTimeRange(startTimestamp, safeTs);
702 lastSafeTimestamp = safeTs;
703 } else {
704 timeRange = null;
705 }
706 safeTimestamp = endTimestamp;
707 if (timeRange != null) {
708 final TmfTraceRangeUpdatedSignal signal = new TmfTraceRangeUpdatedSignal(TmfExperiment.this, TmfExperiment.this, timeRange);
709 broadcast(signal);
710 }
711 }
712 try {
713 Thread.sleep(getStreamingInterval());
714 } catch (final InterruptedException e) {
715 e.printStackTrace();
716 }
717 }
718 }
719 };
720 thread.start();
721 }
722
723 @Override
724 public long getStreamingInterval() {
725 long interval = 0;
726 for (final ITmfTrace trace : getChildren(ITmfTrace.class)) {
727 interval = Math.max(interval, trace.getStreamingInterval());
728 }
729 return interval;
730 }
731
732 // ------------------------------------------------------------------------
733 // Signal handlers
734 // ------------------------------------------------------------------------
735
736 @Override
737 @TmfSignalHandler
738 public void traceOpened(TmfTraceOpenedSignal signal) {
739 if (signal.getTrace() == this) {
740 initializeStreamingMonitor();
741
742 /* Initialize the analysis */
743 MultiStatus status = new MultiStatus(Activator.PLUGIN_ID, IStatus.OK, null, null);
744 status.add(executeAnalysis());
745 if (!status.isOK()) {
746 Activator.log(status);
747 }
748 TmfTraceManager.refreshSupplementaryFiles(this);
749 }
750 }
751
752 @Override
753 public synchronized int getCheckpointSize() {
754 int totalCheckpointSize = 0;
755 try {
756 List<ITmfTrace> children = getChildren(ITmfTrace.class);
757 for (ITmfTrace trace : children) {
758 if (!(trace instanceof ITmfPersistentlyIndexable)) {
759 return 0;
760 }
761
762 ITmfPersistentlyIndexable persistableIndexTrace = (ITmfPersistentlyIndexable) trace;
763 int currentTraceCheckpointSize = persistableIndexTrace.getCheckpointSize();
764 if (currentTraceCheckpointSize <= 0) {
765 return 0;
766 }
767 totalCheckpointSize += currentTraceCheckpointSize;
768 // each entry in the TmfLocationArray has a rank in addition
769 // of the location
770 totalCheckpointSize += 8;
771 }
772 } catch (UnsupportedOperationException e) {
773 return 0;
774 }
775
776 return totalCheckpointSize;
777 }
778
779 @Override
780 public ITmfLocation restoreLocation(ByteBuffer bufferIn) {
781 List<ITmfTrace> children = getChildren(ITmfTrace.class);
782 int length = children.size();
783 ITmfLocation[] locations = new ITmfLocation[length];
784 long[] ranks = new long[length];
785 for (int i = 0; i < length; ++i) {
786 final ITmfTrace trace = children.get(i);
787 locations[i] = ((ITmfPersistentlyIndexable) trace).restoreLocation(bufferIn);
788 ranks[i] = bufferIn.getLong();
789 }
790 TmfLocationArray arr = new TmfLocationArray(locations, ranks);
791 TmfExperimentLocation l = new TmfExperimentLocation(arr);
792 return l;
793 }
794 }
This page took 0.057558 seconds and 5 git commands to generate.