tmf: Automatically sync experiments set up with the same hosts
[deliverable/tracecompass.git] / tmf / org.eclipse.tracecompass.tmf.core / src / org / eclipse / tracecompass / tmf / core / trace / experiment / TmfExperiment.java
1 /*******************************************************************************
2 * Copyright (c) 2009, 2015 Ericsson, École Polytechnique de Montréal
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 * Francois Chouinard - Updated as per TMF Trace Model 1.0
12 * Patrick Tasse - Updated for removal of context clone
13 * Patrick Tasse - Updated for ranks in experiment location
14 * Geneviève Bastien - Added support of experiment synchronization
15 * Added the initExperiment method and default constructor
16 * Bernd Hufmann - Updated for added interfaces to ITmfEventProvider
17 *******************************************************************************/
18
19 package org.eclipse.tracecompass.tmf.core.trace.experiment;
20
21 import static org.eclipse.tracecompass.common.core.NonNullUtils.checkNotNull;
22
23 import java.io.File;
24 import java.math.BigInteger;
25 import java.nio.ByteBuffer;
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.List;
29 import java.util.concurrent.locks.Lock;
30 import java.util.concurrent.locks.ReentrantLock;
31 import java.util.function.Function;
32 import java.util.stream.Collectors;
33
34 import org.eclipse.core.resources.IProject;
35 import org.eclipse.core.resources.IResource;
36 import org.eclipse.core.runtime.CoreException;
37 import org.eclipse.core.runtime.IStatus;
38 import org.eclipse.core.runtime.MultiStatus;
39 import org.eclipse.core.runtime.Status;
40 import org.eclipse.jdt.annotation.NonNull;
41 import org.eclipse.jdt.annotation.Nullable;
42 import org.eclipse.tracecompass.internal.tmf.core.Activator;
43 import org.eclipse.tracecompass.internal.tmf.core.synchronization.TmfTimestampTransform;
44 import org.eclipse.tracecompass.internal.tmf.core.trace.experiment.TmfExperimentContext;
45 import org.eclipse.tracecompass.internal.tmf.core.trace.experiment.TmfExperimentLocation;
46 import org.eclipse.tracecompass.internal.tmf.core.trace.experiment.TmfLocationArray;
47 import org.eclipse.tracecompass.tmf.core.TmfCommonConstants;
48 import org.eclipse.tracecompass.tmf.core.event.ITmfEvent;
49 import org.eclipse.tracecompass.tmf.core.exceptions.TmfTraceException;
50 import org.eclipse.tracecompass.tmf.core.project.model.ITmfPropertiesProvider;
51 import org.eclipse.tracecompass.tmf.core.request.ITmfEventRequest;
52 import org.eclipse.tracecompass.tmf.core.signal.TmfSignalHandler;
53 import org.eclipse.tracecompass.tmf.core.signal.TmfTraceOpenedSignal;
54 import org.eclipse.tracecompass.tmf.core.signal.TmfTraceRangeUpdatedSignal;
55 import org.eclipse.tracecompass.tmf.core.signal.TmfTraceSynchronizedSignal;
56 import org.eclipse.tracecompass.tmf.core.synchronization.ITmfTimestampTransform;
57 import org.eclipse.tracecompass.tmf.core.synchronization.SynchronizationAlgorithm;
58 import org.eclipse.tracecompass.tmf.core.synchronization.SynchronizationManager;
59 import org.eclipse.tracecompass.tmf.core.synchronization.TimestampTransformFactory;
60 import org.eclipse.tracecompass.tmf.core.timestamp.ITmfTimestamp;
61 import org.eclipse.tracecompass.tmf.core.timestamp.TmfTimeRange;
62 import org.eclipse.tracecompass.tmf.core.timestamp.TmfTimestamp;
63 import org.eclipse.tracecompass.tmf.core.trace.ITmfContext;
64 import org.eclipse.tracecompass.tmf.core.trace.ITmfTrace;
65 import org.eclipse.tracecompass.tmf.core.trace.TmfTrace;
66 import org.eclipse.tracecompass.tmf.core.trace.TmfTraceManager;
67 import org.eclipse.tracecompass.tmf.core.trace.indexer.ITmfPersistentlyIndexable;
68 import org.eclipse.tracecompass.tmf.core.trace.indexer.ITmfTraceIndexer;
69 import org.eclipse.tracecompass.tmf.core.trace.indexer.TmfBTreeTraceIndexer;
70 import org.eclipse.tracecompass.tmf.core.trace.location.ITmfLocation;
71
72 import com.google.common.collect.HashMultimap;
73 import com.google.common.collect.Multimap;
74
75 /**
76 * TmfExperiment presents a time-ordered, unified view of a set of ITmfTrace:s
77 * that are part of a tracing experiment.
78 *
79 * @version 1.0
80 * @author Francois Chouinard
81 */
82 public class TmfExperiment extends TmfTrace implements ITmfPersistentlyIndexable {
83
84 // ------------------------------------------------------------------------
85 // Constants
86 // ------------------------------------------------------------------------
87
88 /**
89 * The file name of the Synchronization
90 *
91 * @deprecated This file name shouldn't be used directly anymore. All
92 * synchronization files have been moved to a folder and you
93 * should use the {@link #getSynchronizationFolder(boolean)}
94 * method to return the path to this folder.
95 */
96 @Deprecated
97 public static final String SYNCHRONIZATION_FILE_NAME = "synchronization.bin"; //$NON-NLS-1$
98
99 /**
100 * The name of the directory containing trace synchronization data. This
101 * directory typically will be preserved when traces are synchronized.
102 * Analysis involved in synchronization can put their supplementary files in
103 * there so they are not deleted when synchronized traces are copied.
104 */
105 private static final String SYNCHRONIZATION_DIRECTORY = "sync_data"; //$NON-NLS-1$
106
107 /**
108 * The default index page size
109 */
110 public static final int DEFAULT_INDEX_PAGE_SIZE = 5000;
111
112 /**
113 * Property name for traces defining a clock offset.
114 */
115 private static final String CLOCK_OFFSET_PROPERTY = "clock_offset"; //$NON-NLS-1$
116
117 /**
118 * If the automatic clock offset is higher than this value, emit a warning.
119 */
120 private static final long CLOCK_OFFSET_THRESHOLD_NS = 500000;
121
122 // ------------------------------------------------------------------------
123 // Attributes
124 // ------------------------------------------------------------------------
125
126 /**
127 * The set of traces that constitute the experiment
128 */
129 private boolean fInitialized = false;
130
131 /**
132 * Lock for synchronization methods. These methods cannot be 'synchronized'
133 * since it makes it impossible to use an event request on the experiment
134 * during synchronization (the request thread would block)
135 */
136 private final Lock fSyncLock = new ReentrantLock();
137
138 // ------------------------------------------------------------------------
139 // Construction
140 // ------------------------------------------------------------------------
141
142 /**
143 * Default constructor. Should not be used directly, but is needed for
144 * extension points.
145 *
146 * @deprecated Do not call this directly (but do not remove it either!)
147 */
148 @Deprecated
149 public TmfExperiment() {
150 super();
151 }
152
153 /**
154 * Constructor of an experiment, taking the type, path, traces,
155 * indexPageSize and resource
156 *
157 * @param type
158 * The event type
159 * @param path
160 * The experiment path
161 * @param traces
162 * The experiment set of traces
163 * @param indexPageSize
164 * The experiment index page size. You can use
165 * {@link TmfExperiment#DEFAULT_INDEX_PAGE_SIZE} for a default
166 * value.
167 * @param resource
168 * The resource associated to the experiment. You can use 'null'
169 * for no resources (tests, etc.)
170 */
171 public TmfExperiment(final Class<? extends ITmfEvent> type,
172 final String path,
173 final ITmfTrace[] traces,
174 final int indexPageSize,
175 final @Nullable IResource resource) {
176 initExperiment(type, path, traces, indexPageSize, resource);
177 }
178
179 @Override
180 protected ITmfTraceIndexer createIndexer(int interval) {
181 if (getCheckpointSize() > 0) {
182 return new TmfBTreeTraceIndexer(this, interval);
183 }
184 return super.createIndexer(interval);
185 }
186
187 /**
188 * Clears the experiment
189 */
190 @Override
191 public synchronized void dispose() {
192
193 // Clean up the index if applicable
194 if (getIndexer() != null) {
195 getIndexer().dispose();
196 }
197
198 super.dispose();
199 }
200
201 // ------------------------------------------------------------------------
202 // ITmfTrace - Initializers
203 // ------------------------------------------------------------------------
204
205 @Override
206 public void initTrace(final IResource resource, final String path, final Class<? extends ITmfEvent> type) {
207 /* Do nothing for experiments */
208 }
209
210 /**
211 * Initialization of an experiment, taking the type, path, traces,
212 * indexPageSize and resource
213 *
214 * @param type
215 * the event type
216 * @param path
217 * the experiment path
218 * @param traces
219 * the experiment set of traces
220 * @param indexPageSize
221 * the experiment index page size
222 * @param resource
223 * the resource associated to the experiment
224 */
225 public void initExperiment(final Class<? extends ITmfEvent> type,
226 final String path,
227 final ITmfTrace[] traces,
228 final int indexPageSize,
229 final @Nullable IResource resource) {
230
231 setCacheSize(indexPageSize);
232 setStreamingInterval(0);
233
234 Multimap<String, ITmfTrace> tracesPerHost = HashMultimap.create();
235
236 // traces have to be set before super.initialize()
237 if (traces != null) {
238 // initialize
239 for (ITmfTrace trace : traces) {
240 if (trace != null) {
241 tracesPerHost.put(trace.getHostId(), trace);
242 addChild(trace);
243 }
244 }
245 }
246
247 try {
248 super.initialize(resource, path, type);
249 } catch (TmfTraceException e) {
250 Activator.logError("Error initializing experiment", e); //$NON-NLS-1$
251 }
252
253 if (resource != null) {
254 synchronizeTraces();
255 }
256
257 /*
258 * For all traces on the same host, if two or more specify different
259 * clock offsets, adjust their clock offset by the average of all of them.
260 *
261 * See https://bugs.eclipse.org/bugs/show_bug.cgi?id=484620
262 */
263 Function<ITmfPropertiesProvider, @Nullable Long> offsetGetter = trace -> {
264 String offset = trace.getProperties().get(CLOCK_OFFSET_PROPERTY);
265 if (offset == null) {
266 return null;
267 }
268 try {
269 return Long.parseLong(offset);
270 } catch (NumberFormatException e) {
271 return null;
272 }
273 };
274
275 for (String host : tracesPerHost.keySet()) {
276 /*
277 * Only attempt to synchronize traces that provide a clock_offset
278 * property.
279 */
280 Collection<ITmfPropertiesProvider> tracesToSynchronize = tracesPerHost.get(host).stream()
281 .filter(trace -> trace instanceof ITmfPropertiesProvider)
282 .map(trace -> (ITmfPropertiesProvider) trace)
283 .filter(trace -> offsetGetter.apply(trace) != null)
284 .collect(Collectors.toList());
285
286 if (tracesToSynchronize.size() < 2) {
287 continue;
288 }
289
290 /* Only synchronize traces if they haven't previously been synchronized */
291 if (tracesToSynchronize.stream()
292 .map(trace -> ((ITmfTrace) trace).getTimestampTransform())
293 .anyMatch(transform -> !transform.equals(TmfTimestampTransform.IDENTITY))) {
294 continue;
295 }
296
297 /* Calculate the average of all clock offsets */
298 BigInteger sum = BigInteger.ZERO;
299 for (ITmfPropertiesProvider trace : tracesToSynchronize) {
300 long offset = checkNotNull(offsetGetter.apply(trace));
301 sum = sum.add(BigInteger.valueOf(offset));
302 }
303 long average = sum.divide(BigInteger.valueOf(tracesToSynchronize.size())).longValue();
304
305 if (average > CLOCK_OFFSET_THRESHOLD_NS) {
306 Activator.logWarning("Average clock correction (" + average + ") is higher than threshold of " + //$NON-NLS-1$ //$NON-NLS-2$
307 CLOCK_OFFSET_THRESHOLD_NS + " ns for experiment " + this.toString()); //$NON-NLS-1$
308 }
309
310 /*
311 * Apply the offset correction to all identified traces, but only if
312 * they do not already have an equivalent one (for example, closing
313 * and re-opening the same experiment should not retrigger building
314 * all supplementary files).
315 */
316 tracesToSynchronize.forEach(t -> {
317 long offset = checkNotNull(offsetGetter.apply(t));
318 long delta = average - offset;
319
320 ITmfTrace trace = (ITmfTrace) t;
321 ITmfTimestampTransform currentTransform = trace.getTimestampTransform();
322 ITmfTimestampTransform newTransform = TimestampTransformFactory.createWithOffset(delta);
323
324 if (!newTransform.equals(currentTransform)) {
325 TmfTraceManager.deleteSupplementaryFiles(trace);
326 trace.setTimestampTransform(newTransform);
327 }
328 });
329 }
330 }
331
332 @Override
333 public IStatus validate(final IProject project, final String path) {
334 return Status.OK_STATUS;
335 }
336
337 // ------------------------------------------------------------------------
338 // Accessors
339 // ------------------------------------------------------------------------
340
341 /**
342 * Get the traces contained in this experiment.
343 *
344 * @return The array of contained traces
345 */
346 public List<@NonNull ITmfTrace> getTraces() {
347 return getChildren(ITmfTrace.class);
348 }
349
350 /**
351 * Returns the timestamp of the event at the requested index. If none,
352 * returns null.
353 *
354 * @param index
355 * the event index (rank)
356 * @return the corresponding event timestamp
357 */
358 public ITmfTimestamp getTimestamp(final int index) {
359 final ITmfContext context = seekEvent(index);
360 final ITmfEvent event = getNext(context);
361 context.dispose();
362 return (event != null) ? event.getTimestamp() : null;
363 }
364
365 // ------------------------------------------------------------------------
366 // Request management
367 // ------------------------------------------------------------------------
368
369 @Override
370 public synchronized ITmfContext armRequest(final ITmfEventRequest request) {
371
372 // Make sure we have something to read from
373 if (getChildren().isEmpty()) {
374 return null;
375 }
376
377 if (!TmfTimestamp.BIG_BANG.equals(request.getRange().getStartTime())
378 && request.getIndex() == 0) {
379 final ITmfContext context = seekEvent(request.getRange().getStartTime());
380 request.setStartIndex((int) context.getRank());
381 return context;
382
383 }
384
385 return seekEvent(request.getIndex());
386 }
387
388 // ------------------------------------------------------------------------
389 // ITmfTrace trace positioning
390 // ------------------------------------------------------------------------
391
392 @Override
393 public synchronized ITmfContext seekEvent(final ITmfLocation location) {
394 // Validate the location
395 if (location != null && !(location instanceof TmfExperimentLocation)) {
396 return null; // Throw an exception?
397 }
398
399 int length = getNbChildren();
400
401 // Initialize the location array if necessary
402 TmfLocationArray locationArray = ((location == null) ? new TmfLocationArray(length) : ((TmfExperimentLocation) location).getLocationInfo());
403
404 ITmfLocation[] locations = locationArray.getLocations();
405 long[] ranks = locationArray.getRanks();
406
407 // Create and populate the context's traces contexts
408 final TmfExperimentContext context = new TmfExperimentContext(length);
409
410 // Position the traces
411 long rank = 0;
412 for (int i = 0; i < length; i++) {
413 // Get the relevant trace attributes
414 final ITmfContext traceContext = ((ITmfTrace) getChild(i)).seekEvent(locations[i]);
415 context.setContext(i, traceContext);
416 traceContext.setRank(ranks[i]);
417 // update location after seek
418 locations[i] = traceContext.getLocation();
419 context.setEvent(i, ((ITmfTrace) getChild(i)).getNext(traceContext));
420 rank += ranks[i];
421 }
422
423 // Finalize context
424 context.setLocation(new TmfExperimentLocation(new TmfLocationArray(locations, ranks)));
425 context.setLastTrace(TmfExperimentContext.NO_TRACE);
426 context.setRank(rank);
427
428 return context;
429 }
430
431 // ------------------------------------------------------------------------
432 // ITmfTrace - SeekEvent operations (returning a trace context)
433 // ------------------------------------------------------------------------
434
435 @Override
436 public ITmfContext seekEvent(final double ratio) {
437 final ITmfContext context = seekEvent(Math.round(ratio * getNbEvents()));
438 return context;
439 }
440
441 @Override
442 public double getLocationRatio(final ITmfLocation location) {
443 if (location instanceof TmfExperimentLocation) {
444 long rank = 0;
445 TmfLocationArray locationArray = ((TmfExperimentLocation) location).getLocationInfo();
446 for (int i = 0; i < locationArray.size(); i++) {
447 rank += locationArray.getRank(i);
448 }
449 return (double) rank / getNbEvents();
450 }
451 return 0.0;
452 }
453
454 @Override
455 public ITmfLocation getCurrentLocation() {
456 // never used
457 return null;
458 }
459
460 // ------------------------------------------------------------------------
461 // ITmfTrace trace positioning
462 // ------------------------------------------------------------------------
463
464 @Override
465 public synchronized ITmfEvent parseEvent(final ITmfContext context) {
466 final ITmfContext tmpContext = seekEvent(context.getLocation());
467 final ITmfEvent event = getNext(tmpContext);
468 return event;
469 }
470
471 @Override
472 public synchronized ITmfEvent getNext(ITmfContext context) {
473
474 // Validate the context
475 if (!(context instanceof TmfExperimentContext)) {
476 return null; // Throw an exception?
477 }
478
479 int length = getNbChildren();
480
481 // Make sure that we have something to read from
482 if (length == 0) {
483 return null;
484 }
485
486 TmfExperimentContext expContext = (TmfExperimentContext) context;
487
488 // If an event was consumed previously, first get the next one from that
489 // trace
490 final int lastTrace = expContext.getLastTrace();
491 if (lastTrace != TmfExperimentContext.NO_TRACE) {
492 final ITmfContext traceContext = expContext.getContext(lastTrace);
493 expContext.setEvent(lastTrace, ((ITmfTrace) getChild(lastTrace)).getNext(traceContext));
494 expContext.setLastTrace(TmfExperimentContext.NO_TRACE);
495 }
496
497 // Scan the candidate events and identify the "next" trace to read from
498 int trace = TmfExperimentContext.NO_TRACE;
499 ITmfTimestamp timestamp = TmfTimestamp.BIG_CRUNCH;
500 for (int i = 0; i < length; i++) {
501 final ITmfEvent event = expContext.getEvent(i);
502
503 if (event != null) {
504 final ITmfTimestamp otherTS = event.getTimestamp();
505 if (otherTS.compareTo(timestamp) < 0) {
506 trace = i;
507 timestamp = otherTS;
508 }
509 }
510 }
511
512 ITmfEvent event = null;
513 if (trace != TmfExperimentContext.NO_TRACE) {
514 event = expContext.getEvent(trace);
515 if (event != null) {
516 updateAttributes(expContext, event);
517 expContext.increaseRank();
518 expContext.setLastTrace(trace);
519 final ITmfContext traceContext = expContext.getContext(trace);
520 if (traceContext == null) {
521 throw new IllegalStateException();
522 }
523
524 // Update the experiment location
525 ITmfLocation location = expContext.getLocation();
526 if (location instanceof TmfExperimentLocation) {
527 TmfLocationArray locationArray = new TmfLocationArray(
528 ((TmfExperimentLocation) location).getLocationInfo(),
529 trace, traceContext.getLocation(), traceContext.getRank());
530 expContext.setLocation(new TmfExperimentLocation(locationArray));
531 }
532 }
533 }
534
535 return event;
536 }
537
538 @Override
539 public ITmfTimestamp getInitialRangeOffset() {
540
541 List<ITmfTrace> children = getChildren(ITmfTrace.class);
542
543 if (children.isEmpty()) {
544 return super.getInitialRangeOffset();
545 }
546
547 ITmfTimestamp initTs = TmfTimestamp.BIG_CRUNCH;
548 for (ITmfTrace trace : children) {
549 ITmfTimestamp ts = (trace).getInitialRangeOffset();
550 if (ts.compareTo(initTs) < 0) {
551 initTs = ts;
552 }
553 }
554 return initTs;
555 }
556
557 /**
558 * Get the path to the folder in the supplementary file where
559 * synchronization-related data can be kept so they are not deleted when the
560 * experiment is synchronized. Analysis involved in synchronization can put
561 * their supplementary files in there so they are preserved after
562 * synchronization.
563 *
564 * If the directory does not exist, it will be created. A return value of
565 * <code>null</code> means either the trace resource does not exist or
566 * supplementary resources cannot be kept.
567 *
568 * @param absolute
569 * If <code>true</code>, it returns the absolute path in the file
570 * system, including the supplementary file path. Otherwise, it
571 * returns only the directory name.
572 * @return The path to the folder where synchronization-related
573 * supplementary files can be kept or <code>null</code> if not
574 * available.
575 */
576 public String getSynchronizationFolder(boolean absolute) {
577 /* Set up the path to the synchronization file we'll use */
578 IResource resource = this.getResource();
579 String syncDirectory = null;
580
581 try {
582 /* get the directory where the file will be stored. */
583 if (resource != null) {
584 String fullDirectory = resource.getPersistentProperty(TmfCommonConstants.TRACE_SUPPLEMENTARY_FOLDER);
585 /* Create the synchronization data directory if not present */
586 if (fullDirectory != null) {
587 fullDirectory = fullDirectory + File.separator + SYNCHRONIZATION_DIRECTORY;
588 File syncDir = new File(fullDirectory);
589 syncDir.mkdirs();
590 }
591 if (absolute) {
592 syncDirectory = fullDirectory;
593 } else {
594 syncDirectory = SYNCHRONIZATION_DIRECTORY;
595 }
596 }
597 } catch (CoreException e) {
598 return null;
599 }
600
601 return syncDirectory;
602 }
603
604 /**
605 * Synchronizes the traces of an experiment. By default it only tries to
606 * read a synchronization file if it exists
607 *
608 * @return The synchronization object
609 */
610 public SynchronizationAlgorithm synchronizeTraces() {
611 return synchronizeTraces(false);
612 }
613
614 /**
615 * Synchronizes the traces of an experiment.
616 *
617 * @param doSync
618 * Whether to actually synchronize or just try opening a sync
619 * file
620 * @return The synchronization object
621 */
622 public SynchronizationAlgorithm synchronizeTraces(boolean doSync) {
623 fSyncLock.lock();
624
625 try {
626 String syncDirectory = getSynchronizationFolder(true);
627
628 final File syncFile = (syncDirectory != null) ? new File(syncDirectory + File.separator + SYNCHRONIZATION_FILE_NAME) : null;
629
630 final SynchronizationAlgorithm syncAlgo = SynchronizationManager.synchronizeTraces(syncFile, Collections.singleton(this), doSync);
631
632 final TmfTraceSynchronizedSignal signal = new TmfTraceSynchronizedSignal(this, syncAlgo);
633
634 /* Broadcast in separate thread to prevent deadlock */
635 new Thread() {
636 @Override
637 public void run() {
638 broadcast(signal);
639 }
640 }.start();
641
642 return syncAlgo;
643 } finally {
644 fSyncLock.unlock();
645 }
646 }
647
648 @Override
649 @SuppressWarnings("nls")
650 public synchronized String toString() {
651 return "[TmfExperiment (" + getName() + ")]";
652 }
653
654 // ------------------------------------------------------------------------
655 // Streaming support
656 // ------------------------------------------------------------------------
657
658 private synchronized void initializeStreamingMonitor() {
659
660 if (fInitialized) {
661 return;
662 }
663 fInitialized = true;
664
665 if (getStreamingInterval() == 0) {
666 final ITmfContext context = seekEvent(0);
667 final ITmfEvent event = getNext(context);
668 context.dispose();
669 if (event == null) {
670 return;
671 }
672 final TmfTimeRange timeRange = new TmfTimeRange(event.getTimestamp(), TmfTimestamp.BIG_CRUNCH);
673 final TmfTraceRangeUpdatedSignal signal = new TmfTraceRangeUpdatedSignal(this, this, timeRange);
674
675 // Broadcast in separate thread to prevent deadlock
676 new Thread() {
677 @Override
678 public void run() {
679 broadcast(signal);
680 }
681 }.start();
682 return;
683 }
684
685 final Thread thread = new Thread("Streaming Monitor for experiment " + getName()) { //$NON-NLS-1$
686 private ITmfTimestamp safeTimestamp = null;
687 private ITmfTimestamp lastSafeTimestamp = null;
688 private TmfTimeRange timeRange = null;
689
690 @Override
691 public void run() {
692 while (!executorIsShutdown()) {
693 if (!getIndexer().isIndexing()) {
694 ITmfTimestamp startTimestamp = TmfTimestamp.BIG_CRUNCH;
695 ITmfTimestamp endTimestamp = TmfTimestamp.BIG_BANG;
696
697 for (final ITmfTrace trace : getChildren(ITmfTrace.class)) {
698 if (trace.getStartTime().compareTo(startTimestamp) < 0) {
699 startTimestamp = trace.getStartTime();
700 }
701 if (trace.getStreamingInterval() != 0 && trace.getEndTime().compareTo(endTimestamp) > 0) {
702 endTimestamp = trace.getEndTime();
703 }
704 }
705 ITmfTimestamp safeTs = safeTimestamp;
706 if (safeTs != null && (lastSafeTimestamp == null || safeTs.compareTo(lastSafeTimestamp) > 0)) {
707 timeRange = new TmfTimeRange(startTimestamp, safeTs);
708 lastSafeTimestamp = safeTs;
709 } else {
710 timeRange = null;
711 }
712 safeTimestamp = endTimestamp;
713 if (timeRange != null) {
714 final TmfTraceRangeUpdatedSignal signal = new TmfTraceRangeUpdatedSignal(TmfExperiment.this, TmfExperiment.this, timeRange);
715 broadcast(signal);
716 }
717 }
718 try {
719 Thread.sleep(getStreamingInterval());
720 } catch (final InterruptedException e) {
721 e.printStackTrace();
722 }
723 }
724 }
725 };
726 thread.start();
727 }
728
729 @Override
730 public long getStreamingInterval() {
731 long interval = 0;
732 for (final ITmfTrace trace : getChildren(ITmfTrace.class)) {
733 interval = Math.max(interval, trace.getStreamingInterval());
734 }
735 return interval;
736 }
737
738 // ------------------------------------------------------------------------
739 // Signal handlers
740 // ------------------------------------------------------------------------
741
742 @Override
743 @TmfSignalHandler
744 public void traceOpened(TmfTraceOpenedSignal signal) {
745 if (signal.getTrace() == this) {
746 initializeStreamingMonitor();
747
748 /* Initialize the analysis */
749 MultiStatus status = new MultiStatus(Activator.PLUGIN_ID, IStatus.OK, null, null);
750 status.add(executeAnalysis());
751 if (!status.isOK()) {
752 Activator.log(status);
753 }
754 TmfTraceManager.refreshSupplementaryFiles(this);
755 }
756 }
757
758 @Override
759 public synchronized int getCheckpointSize() {
760 int totalCheckpointSize = 0;
761 try {
762 List<ITmfTrace> children = getChildren(ITmfTrace.class);
763 for (ITmfTrace trace : children) {
764 if (!(trace instanceof ITmfPersistentlyIndexable)) {
765 return 0;
766 }
767
768 ITmfPersistentlyIndexable persistableIndexTrace = (ITmfPersistentlyIndexable) trace;
769 int currentTraceCheckpointSize = persistableIndexTrace.getCheckpointSize();
770 if (currentTraceCheckpointSize <= 0) {
771 return 0;
772 }
773 totalCheckpointSize += currentTraceCheckpointSize;
774 // each entry in the TmfLocationArray has a rank in addition
775 // of the location
776 totalCheckpointSize += 8;
777 }
778 } catch (UnsupportedOperationException e) {
779 return 0;
780 }
781
782 return totalCheckpointSize;
783 }
784
785 @Override
786 public ITmfLocation restoreLocation(ByteBuffer bufferIn) {
787 List<ITmfTrace> children = getChildren(ITmfTrace.class);
788 int length = children.size();
789 ITmfLocation[] locations = new ITmfLocation[length];
790 long[] ranks = new long[length];
791 for (int i = 0; i < length; ++i) {
792 final ITmfTrace trace = children.get(i);
793 locations[i] = ((ITmfPersistentlyIndexable) trace).restoreLocation(bufferIn);
794 ranks[i] = bufferIn.getLong();
795 }
796 TmfLocationArray arr = new TmfLocationArray(locations, ranks);
797 TmfExperimentLocation l = new TmfExperimentLocation(arr);
798 return l;
799 }
800 }
This page took 0.066939 seconds and 5 git commands to generate.