Merge branch 'master' into lttng_2_0_control_dev
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf.core / src / org / eclipse / linuxtools / tmf / core / experiment / TmfExperiment.java
1 /*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
12
13 package org.eclipse.linuxtools.tmf.core.experiment;
14
15 import java.util.Collections;
16 import java.util.Vector;
17
18 import org.eclipse.core.resources.IFile;
19 import org.eclipse.core.resources.IProject;
20 import org.eclipse.core.resources.IResource;
21 import org.eclipse.core.runtime.IProgressMonitor;
22 import org.eclipse.core.runtime.IStatus;
23 import org.eclipse.core.runtime.Status;
24 import org.eclipse.core.runtime.jobs.Job;
25 import org.eclipse.linuxtools.tmf.core.component.TmfEventProvider;
26 import org.eclipse.linuxtools.tmf.core.event.ITmfEvent;
27 import org.eclipse.linuxtools.tmf.core.event.ITmfTimestamp;
28 import org.eclipse.linuxtools.tmf.core.event.TmfTimeRange;
29 import org.eclipse.linuxtools.tmf.core.event.TmfTimestamp;
30 import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest;
31 import org.eclipse.linuxtools.tmf.core.request.ITmfEventRequest;
32 import org.eclipse.linuxtools.tmf.core.request.TmfDataRequest;
33 import org.eclipse.linuxtools.tmf.core.request.TmfEventRequest;
34 import org.eclipse.linuxtools.tmf.core.signal.TmfEndSynchSignal;
35 import org.eclipse.linuxtools.tmf.core.signal.TmfExperimentDisposedSignal;
36 import org.eclipse.linuxtools.tmf.core.signal.TmfExperimentRangeUpdatedSignal;
37 import org.eclipse.linuxtools.tmf.core.signal.TmfExperimentSelectedSignal;
38 import org.eclipse.linuxtools.tmf.core.signal.TmfExperimentUpdatedSignal;
39 import org.eclipse.linuxtools.tmf.core.signal.TmfSignalHandler;
40 import org.eclipse.linuxtools.tmf.core.signal.TmfSignalManager;
41 import org.eclipse.linuxtools.tmf.core.signal.TmfTraceUpdatedSignal;
42 import org.eclipse.linuxtools.tmf.core.trace.ITmfContext;
43 import org.eclipse.linuxtools.tmf.core.trace.ITmfLocation;
44 import org.eclipse.linuxtools.tmf.core.trace.ITmfTrace;
45 import org.eclipse.linuxtools.tmf.core.trace.TmfCheckpoint;
46 import org.eclipse.linuxtools.tmf.core.trace.TmfContext;
47
48 /**
49 * <b><u>TmfExperiment</u></b>
50 * <p>
51 * TmfExperiment presents a time-ordered, unified view of a set of TmfTraces that are part of a tracing experiment.
52 * <p>
53 */
54 public class TmfExperiment<T extends ITmfEvent> extends TmfEventProvider<T> implements ITmfTrace<T> {
55
56 // ------------------------------------------------------------------------
57 // Attributes
58 // ------------------------------------------------------------------------
59
60 // The currently selected experiment
61 protected static TmfExperiment<?> fCurrentExperiment = null;
62
63 // The set of traces that constitute the experiment
64 protected ITmfTrace<T>[] fTraces;
65
66 // The total number of events
67 protected long fNbEvents;
68
69 // The experiment time range
70 protected TmfTimeRange fTimeRange;
71
72 // The experiment reference timestamp (default: Zero)
73 protected ITmfTimestamp fEpoch;
74
75 // The experiment index
76 protected Vector<TmfCheckpoint> fCheckpoints = new Vector<TmfCheckpoint>();
77
78 // The current experiment context
79 protected TmfExperimentContext fExperimentContext;
80
81 // Flag to initialize only once
82 private boolean fInitialized = false;
83
84 // The experiment bookmarks file
85 private IFile fBookmarksFile;
86
87 // The properties resource
88 private IResource fResource;
89
90 // ------------------------------------------------------------------------
91 // Constructors
92 // ------------------------------------------------------------------------
93
94 @Override
95 public boolean validate(IProject project, String path) {
96 return true;
97 }
98
99 @Override
100 public void initTrace(String name, String path, Class<T> eventType) {
101 }
102
103 @Override
104 public void initTrace(String name, String path, Class<T> eventType, boolean indexTrace) {
105 if (indexTrace) {
106 initializeStreamingMonitor();
107 }
108 }
109
110 @Override
111 public void initTrace(String name, String path, Class<T> eventType, int cacheSize) {
112 }
113
114 @Override
115 public void initTrace(String name, String path, Class<T> eventType, int cacheSize, boolean indexTrace) {
116 if (indexTrace) {
117 initializeStreamingMonitor();
118 }
119 }
120
121 /**
122 * @param type
123 * @param id
124 * @param traces
125 * @param epoch
126 * @param indexPageSize
127 */
128 public TmfExperiment(Class<T> type, String id, ITmfTrace<T>[] traces, ITmfTimestamp epoch, int indexPageSize) {
129 this(type, id, traces, TmfTimestamp.ZERO, indexPageSize, false);
130 }
131
132 public TmfExperiment(Class<T> type, String id, ITmfTrace<T>[] traces, ITmfTimestamp epoch, int indexPageSize, boolean preIndexExperiment) {
133 super(id, type);
134
135 fTraces = traces;
136 fEpoch = epoch;
137 fIndexPageSize = indexPageSize;
138 fTimeRange = TmfTimeRange.NULL_RANGE;
139
140 if (preIndexExperiment) {
141 indexExperiment(true);
142 updateTimeRange();
143 }
144
145 }
146
147 protected TmfExperiment(String id, Class<T> type) {
148 super(id, type);
149 }
150
151 /**
152 * @param type
153 * @param id
154 * @param traces
155 */
156 public TmfExperiment(Class<T> type, String id, ITmfTrace<T>[] traces) {
157 this(type, id, traces, TmfTimestamp.ZERO, DEFAULT_INDEX_PAGE_SIZE);
158 }
159
160 /**
161 * @param type
162 * @param id
163 * @param traces
164 * @param indexPageSize
165 */
166 public TmfExperiment(Class<T> type, String id, ITmfTrace<T>[] traces, int indexPageSize) {
167 this(type, id, traces, TmfTimestamp.ZERO, indexPageSize);
168 }
169
170 /**
171 * Copy constructor
172 *
173 * @param other
174 */
175 @SuppressWarnings("unchecked")
176 public TmfExperiment(TmfExperiment<T> other) {
177 super(other.getName() + "(clone)", other.fType); //$NON-NLS-1$
178
179 fEpoch = other.fEpoch;
180 fIndexPageSize = other.fIndexPageSize;
181
182 fTraces = new ITmfTrace[other.fTraces.length];
183 for (int trace = 0; trace < other.fTraces.length; trace++) {
184 fTraces[trace] = other.fTraces[trace].copy();
185 }
186
187 fNbEvents = other.fNbEvents;
188 fTimeRange = other.fTimeRange;
189 }
190
191 @Override
192 public TmfExperiment<T> copy() {
193 TmfExperiment<T> experiment = new TmfExperiment<T>(this);
194 TmfSignalManager.deregister(experiment);
195 return experiment;
196 }
197
198 /**
199 * Clears the experiment
200 */
201 @Override
202 @SuppressWarnings("rawtypes")
203 public synchronized void dispose() {
204
205 TmfExperimentDisposedSignal<T> signal = new TmfExperimentDisposedSignal<T>(this, this);
206 broadcast(signal);
207 if (fCurrentExperiment == this) {
208 fCurrentExperiment = null;
209 }
210
211 if (fTraces != null) {
212 for (ITmfTrace trace : fTraces) {
213 trace.dispose();
214 }
215 fTraces = null;
216 }
217 if (fCheckpoints != null) {
218 fCheckpoints.clear();
219 }
220 super.dispose();
221 }
222
223 // ------------------------------------------------------------------------
224 // ITmfTrace
225 // ------------------------------------------------------------------------
226
227 @Override
228 public long getNbEvents() {
229 return fNbEvents;
230 }
231
232 @Override
233 public int getCacheSize() {
234 return fIndexPageSize;
235 }
236
237 @Override
238 public TmfTimeRange getTimeRange() {
239 return fTimeRange;
240 }
241
242 @Override
243 public ITmfTimestamp getStartTime() {
244 return fTimeRange.getStartTime();
245 }
246
247 @Override
248 public ITmfTimestamp getEndTime() {
249 return fTimeRange.getEndTime();
250 }
251
252 public Vector<TmfCheckpoint> getCheckpoints() {
253 return fCheckpoints;
254 }
255
256 // ------------------------------------------------------------------------
257 // Accessors
258 // ------------------------------------------------------------------------
259
260 public static void setCurrentExperiment(TmfExperiment<?> experiment) {
261 if (fCurrentExperiment != null && fCurrentExperiment != experiment) {
262 fCurrentExperiment.dispose();
263 }
264 fCurrentExperiment = experiment;
265 }
266
267 public static TmfExperiment<?> getCurrentExperiment() {
268 return fCurrentExperiment;
269 }
270
271 public ITmfTimestamp getEpoch() {
272 return fEpoch;
273 }
274
275 public ITmfTrace<T>[] getTraces() {
276 return fTraces;
277 }
278
279 /**
280 * Returns the rank of the first event with the requested timestamp. If none, returns the index of the next event
281 * (if any).
282 *
283 * @param timestamp
284 * @return
285 */
286 @Override
287 public long getRank(ITmfTimestamp timestamp) {
288 TmfExperimentContext context = seekEvent(timestamp);
289 return context.getRank();
290 }
291
292 /**
293 * Returns the timestamp of the event at the requested index. If none, returns null.
294 *
295 * @param index
296 * @return
297 */
298 public ITmfTimestamp getTimestamp(int index) {
299 TmfExperimentContext context = seekEvent(index);
300 ITmfEvent event = getNextEvent(context);
301 return (event != null) ? event.getTimestamp() : null;
302 }
303
304 // ------------------------------------------------------------------------
305 // Operators
306 // ------------------------------------------------------------------------
307
308 /**
309 * Update the global time range
310 */
311 protected void updateTimeRange() {
312 ITmfTimestamp startTime = fTimeRange != TmfTimeRange.NULL_RANGE ? fTimeRange.getStartTime() : TmfTimestamp.BIG_CRUNCH;
313 ITmfTimestamp endTime = fTimeRange != TmfTimeRange.NULL_RANGE ? fTimeRange.getEndTime() : TmfTimestamp.BIG_BANG;
314
315 for (ITmfTrace<T> trace : fTraces) {
316 ITmfTimestamp traceStartTime = trace.getStartTime();
317 if (traceStartTime.compareTo(startTime, true) < 0)
318 startTime = traceStartTime;
319 ITmfTimestamp traceEndTime = trace.getEndTime();
320 if (traceEndTime.compareTo(endTime, true) > 0)
321 endTime = traceEndTime;
322 }
323 fTimeRange = new TmfTimeRange(startTime, endTime);
324 }
325
326 // ------------------------------------------------------------------------
327 // TmfProvider
328 // ------------------------------------------------------------------------
329 @Override
330 public ITmfContext armRequest(ITmfDataRequest<T> request) {
331 // Tracer.trace("Ctx: Arming request - start");
332 ITmfTimestamp timestamp = (request instanceof ITmfEventRequest<?>) ? ((ITmfEventRequest<T>) request).getRange().getStartTime()
333 : null;
334
335 if (TmfTimestamp.BIG_BANG.equals(timestamp) || request.getIndex() > 0) {
336 timestamp = null; // use request index
337 }
338
339 TmfExperimentContext context = null;
340 if (timestamp != null) {
341 // seek by timestamp
342 context = seekEvent(timestamp);
343 ((ITmfEventRequest<T>) request).setStartIndex((int) context.getRank());
344 } else {
345 // Seek by rank
346 if ((fExperimentContext != null) && fExperimentContext.getRank() == request.getIndex()) {
347 // We are already at the right context -> no need to seek
348 context = fExperimentContext;
349 } else {
350 context = seekEvent(request.getIndex());
351 }
352 }
353 // Tracer.trace("Ctx: Arming request - done");
354 return context;
355 }
356
357 @SuppressWarnings("unchecked")
358 @Override
359 public T getNext(ITmfContext context) {
360 if (context instanceof TmfExperimentContext) {
361 return (T) getNextEvent((TmfExperimentContext) context);
362 }
363 return null;
364 }
365
366 // ------------------------------------------------------------------------
367 // ITmfTrace trace positioning
368 // ------------------------------------------------------------------------
369
370 // Returns a brand new context based on the location provided
371 // and initializes the event queues
372 @Override
373 public synchronized TmfExperimentContext seekLocation(ITmfLocation<?> location) {
374 // Validate the location
375 if (location != null && !(location instanceof TmfExperimentLocation)) {
376 return null; // Throw an exception?
377 }
378
379 if (fTraces == null) { // experiment has been disposed
380 return null;
381 }
382
383 // Instantiate the location
384 TmfExperimentLocation expLocation = (location == null) ? new TmfExperimentLocation(new TmfLocationArray(
385 new ITmfLocation<?>[fTraces.length]), new long[fTraces.length]) : (TmfExperimentLocation) location.clone();
386
387 // Create and populate the context's traces contexts
388 TmfExperimentContext context = new TmfExperimentContext(fTraces, new TmfContext[fTraces.length]);
389 // Tracer.trace("Ctx: SeekLocation - start");
390
391 long rank = 0;
392 for (int i = 0; i < fTraces.length; i++) {
393 // Get the relevant trace attributes
394 ITmfLocation<?> traceLocation = expLocation.getLocation().locations[i];
395 long traceRank = expLocation.getRanks()[i];
396
397 // Set the corresponding sub-context
398 context.getContexts()[i] = fTraces[i].seekLocation(traceLocation);
399 context.getContexts()[i].setRank(traceRank);
400 rank += traceRank;
401
402 // Set the trace location and read the corresponding event
403 /* The (TmfContext) cast should be safe since we created 'context'
404 * ourselves higher up. */
405 expLocation.getLocation().locations[i] = ((TmfContext) context.getContexts()[i]).getLocation().clone();
406 context.getEvents()[i] = fTraces[i].getNextEvent(context.getContexts()[i]);
407 }
408
409 // Tracer.trace("Ctx: SeekLocation - done");
410
411 // Finalize context
412 context.setLocation(expLocation);
413 context.setLastTrace(TmfExperimentContext.NO_TRACE);
414 context.setRank(rank);
415
416 fExperimentContext = context;
417
418 return context;
419 }
420
421 /*
422 * (non-Javadoc)
423 *
424 * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#seekEvent(org.eclipse.linuxtools .tmf.event.TmfTimestamp)
425 */
426 @Override
427 public synchronized TmfExperimentContext seekEvent(ITmfTimestamp timestamp) {
428
429 // Tracer.trace("Ctx: seekEvent(TS) - start");
430
431 if (timestamp == null) {
432 timestamp = TmfTimestamp.BIG_BANG;
433 }
434
435 // First, find the right checkpoint
436 int index = Collections.binarySearch(fCheckpoints, new TmfCheckpoint(timestamp, null));
437
438 // In the very likely case that the checkpoint was not found, bsearch
439 // returns its negated would-be location (not an offset...). From that
440 // index, we can then position the stream and get the event.
441 if (index < 0) {
442 index = Math.max(0, -(index + 2));
443 }
444
445 // Position the experiment at the checkpoint
446 ITmfLocation<?> location;
447 synchronized (fCheckpoints) {
448 if (fCheckpoints.size() > 0) {
449 if (index >= fCheckpoints.size()) {
450 index = fCheckpoints.size() - 1;
451 }
452 location = fCheckpoints.elementAt(index).getLocation();
453 } else {
454 location = null;
455 }
456 }
457
458 TmfExperimentContext context = seekLocation(location);
459 context.setRank((long) index * fIndexPageSize);
460
461 // And locate the event
462 ITmfEvent event = parseEvent(context);
463 while (event != null && event.getTimestamp().compareTo(timestamp, false) < 0) {
464 getNextEvent(context);
465 event = parseEvent(context);
466 }
467
468 if (event == null) {
469 context.setLocation(null);
470 context.setRank(ITmfContext.UNKNOWN_RANK);
471 }
472
473 return context;
474 }
475
476 /*
477 * (non-Javadoc)
478 *
479 * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#seekEvent(long)
480 */
481 @Override
482 public synchronized TmfExperimentContext seekEvent(long rank) {
483
484 // Tracer.trace("Ctx: seekEvent(rank) - start");
485
486 // Position the stream at the previous checkpoint
487 int index = (int) rank / fIndexPageSize;
488 ITmfLocation<?> location;
489 synchronized (fCheckpoints) {
490 if (fCheckpoints.size() == 0) {
491 location = null;
492 } else {
493 if (index >= fCheckpoints.size()) {
494 index = fCheckpoints.size() - 1;
495 }
496 location = fCheckpoints.elementAt(index).getLocation();
497 }
498 }
499
500 TmfExperimentContext context = seekLocation(location);
501 context.setRank((long) index * fIndexPageSize);
502
503 // And locate the event
504 ITmfEvent event = parseEvent(context);
505 long pos = context.getRank();
506 while (event != null && pos++ < rank) {
507 getNextEvent(context);
508 event = parseEvent(context);
509 }
510
511 if (event == null) {
512 context.setLocation(null);
513 context.setRank(ITmfContext.UNKNOWN_RANK);
514 }
515
516 return context;
517 }
518
519 @Override
520 public TmfContext seekLocation(double ratio) {
521 TmfContext context = seekEvent((long) (ratio * getNbEvents()));
522 return context;
523 }
524
525 @Override
526 public double getLocationRatio(ITmfLocation<?> location) {
527 if (location instanceof TmfExperimentLocation) {
528 return (double) seekLocation(location).getRank() / getNbEvents();
529 }
530 return 0;
531 }
532
533 @Override
534 public ITmfLocation<?> getCurrentLocation() {
535 if (fExperimentContext != null) {
536 return fExperimentContext.getLocation();
537 }
538 return null;
539 }
540
541 /**
542 * Scan the next events from all traces and return the next one in chronological order.
543 *
544 * @param context
545 * @return
546 */
547
548 // private void dumpContext(TmfExperimentContext context, boolean isBefore) {
549
550 // TmfContext context0 = context.getContexts()[0];
551 // TmfEvent event0 = context.getEvents()[0];
552 // TmfExperimentLocation location0 = (TmfExperimentLocation) context.getLocation();
553 // long rank0 = context.getRank();
554 // int trace = context.getLastTrace();
555 //
556 // StringBuffer result = new StringBuffer("Ctx: " + (isBefore ? "B " : "A "));
557 //
558 // result.append("[Ctx: fLoc= " + context0.getLocation().toString() + ", fRnk= " + context0.getRank() + "] ");
559 // result.append("[Evt: " + event0.getTimestamp().toString() + "] ");
560 // result.append("[Loc: fLoc= " + location0.getLocation()[0].toString() + ", fRnk= " + location0.getRanks()[0] + "] ");
561 // result.append("[Rnk: " + rank0 + "], [Trc: " + trace + "]");
562 // Tracer.trace(result.toString());
563 // }
564
565 @SuppressWarnings("unchecked")
566 @Override
567 public synchronized ITmfEvent getNextEvent(ITmfContext context) {
568
569 // Validate the context
570 if (!(context instanceof TmfExperimentContext)) {
571 return null; // Throw an exception?
572 }
573
574 if (!context.equals(fExperimentContext)) {
575 // Tracer.trace("Ctx: Restoring context");
576 fExperimentContext = seekLocation(context.getLocation());
577 }
578
579 TmfExperimentContext expContext = (TmfExperimentContext) context;
580
581 // dumpContext(expContext, true);
582
583 // If an event was consumed previously, get the next one from that trace
584 int lastTrace = expContext.getLastTrace();
585 if (lastTrace != TmfExperimentContext.NO_TRACE) {
586 ITmfContext traceContext = expContext.getContexts()[lastTrace];
587 expContext.getEvents()[lastTrace] = expContext.getTraces()[lastTrace].getNextEvent(traceContext);
588 expContext.setLastTrace(TmfExperimentContext.NO_TRACE);
589 }
590
591 // Scan the candidate events and identify the "next" trace to read from
592 ITmfEvent eventArray[] = expContext.getEvents();
593 if (eventArray == null) {
594 return null;
595 }
596 int trace = TmfExperimentContext.NO_TRACE;
597 ITmfTimestamp timestamp = TmfTimestamp.BIG_CRUNCH;
598 if (eventArray.length == 1) {
599 if (eventArray[0] != null) {
600 timestamp = eventArray[0].getTimestamp();
601 trace = 0;
602 }
603 } else {
604 for (int i = 0; i < eventArray.length; i++) {
605 ITmfEvent event = eventArray[i];
606 if (event != null && event.getTimestamp() != null) {
607 ITmfTimestamp otherTS = event.getTimestamp();
608 if (otherTS.compareTo(timestamp, true) < 0) {
609 trace = i;
610 timestamp = otherTS;
611 }
612 }
613 }
614 }
615 // Update the experiment context and set the "next" event
616 ITmfEvent event = null;
617 if (trace != TmfExperimentContext.NO_TRACE) {
618 updateIndex(expContext, timestamp);
619
620 ITmfContext traceContext = expContext.getContexts()[trace];
621 TmfExperimentLocation expLocation = (TmfExperimentLocation) expContext.getLocation();
622 // expLocation.getLocation()[trace] = traceContext.getLocation().clone();
623 expLocation.getLocation().locations[trace] = (ITmfLocation<? extends Comparable<?>>) traceContext.getLocation().clone();
624
625 // updateIndex(expContext, timestamp);
626
627 expLocation.getRanks()[trace] = traceContext.getRank();
628 expContext.setLastTrace(trace);
629 expContext.updateRank(1);
630 event = expContext.getEvents()[trace];
631 fExperimentContext = expContext;
632 }
633
634 // if (event != null) {
635 // Tracer.trace("Exp: " + (expContext.getRank() - 1) + ": " + event.getTimestamp().toString());
636 // dumpContext(expContext, false);
637 // Tracer.trace("Ctx: Event returned= " + event.getTimestamp().toString());
638 // }
639
640 return event;
641 }
642
643 public synchronized void updateIndex(ITmfContext context, ITmfTimestamp timestamp) {
644 // Build the index as we go along
645 long rank = context.getRank();
646 if (context.isValidRank() && (rank % fIndexPageSize) == 0) {
647 // Determine the table position
648 long position = rank / fIndexPageSize;
649 // Add new entry at proper location (if empty)
650 if (fCheckpoints.size() == position) {
651 ITmfLocation<?> location = context.getLocation().clone();
652 fCheckpoints.add(new TmfCheckpoint(timestamp.clone(), location));
653 // System.out.println(this + "[" + (fCheckpoints.size() - 1) + "] " + timestamp + ", "
654 // + location.toString());
655 }
656 }
657 }
658
659 /*
660 * (non-Javadoc)
661 *
662 * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#parseEvent(org.eclipse.linuxtools .tmf.trace.TmfContext)
663 */
664 @Override
665 public ITmfEvent parseEvent(ITmfContext context) {
666
667 // Validate the context
668 if (!(context instanceof TmfExperimentContext)) {
669 return null; // Throw an exception?
670 }
671
672 if (!context.equals(fExperimentContext)) {
673 // Tracer.trace("Ctx: Restoring context");
674 seekLocation(context.getLocation());
675 }
676
677 TmfExperimentContext expContext = (TmfExperimentContext) context;
678
679 // If an event was consumed previously, get the next one from that trace
680 int lastTrace = expContext.getLastTrace();
681 if (lastTrace != TmfExperimentContext.NO_TRACE) {
682 ITmfContext traceContext = expContext.getContexts()[lastTrace];
683 expContext.getEvents()[lastTrace] = expContext.getTraces()[lastTrace].getNextEvent(traceContext);
684 expContext.setLastTrace(TmfExperimentContext.NO_TRACE);
685 fExperimentContext = (TmfExperimentContext) context;
686 }
687
688 // Scan the candidate events and identify the "next" trace to read from
689 int trace = TmfExperimentContext.NO_TRACE;
690 ITmfTimestamp timestamp = TmfTimestamp.BIG_CRUNCH;
691 for (int i = 0; i < expContext.getTraces().length; i++) {
692 ITmfEvent event = expContext.getEvents()[i];
693 if (event != null && event.getTimestamp() != null) {
694 ITmfTimestamp otherTS = event.getTimestamp();
695 if (otherTS.compareTo(timestamp, true) < 0) {
696 trace = i;
697 timestamp = otherTS;
698 }
699 }
700 }
701
702 ITmfEvent event = null;
703 if (trace != TmfExperimentContext.NO_TRACE) {
704 event = expContext.getEvents()[trace];
705 }
706
707 return event;
708 }
709
710 /*
711 * (non-Javadoc)
712 *
713 * @see java.lang.Object#toString()
714 */
715 @Override
716 @SuppressWarnings("nls")
717 public String toString() {
718 return "[TmfExperiment (" + getName() + ")]";
719 }
720
721 // ------------------------------------------------------------------------
722 // Indexing
723 // ------------------------------------------------------------------------
724
725 private synchronized void initializeStreamingMonitor() {
726 if (fInitialized) {
727 return;
728 }
729 fInitialized = true;
730
731 if (getStreamingInterval() == 0) {
732 TmfContext context = seekLocation(null);
733 ITmfEvent event = getNext(context);
734 if (event == null) {
735 return;
736 }
737 TmfTimeRange timeRange = new TmfTimeRange(event.getTimestamp().clone(), TmfTimestamp.BIG_CRUNCH);
738 final TmfExperimentRangeUpdatedSignal signal = new TmfExperimentRangeUpdatedSignal(this, this, timeRange);
739
740 // Broadcast in separate thread to prevent deadlock
741 new Thread() {
742 @Override
743 public void run() {
744 broadcast(signal);
745 }
746 }.start();
747 return;
748 }
749
750 final Thread thread = new Thread("Streaming Monitor for experiment " + getName()) { //$NON-NLS-1$
751 ITmfTimestamp safeTimestamp = null;
752 TmfTimeRange timeRange = null;
753
754 @Override
755 public void run() {
756 while (!fExecutor.isShutdown()) {
757 if (!isIndexingBusy()) {
758 ITmfTimestamp startTimestamp = TmfTimestamp.BIG_CRUNCH;
759 ITmfTimestamp endTimestamp = TmfTimestamp.BIG_BANG;
760 for (ITmfTrace<T> trace : fTraces) {
761 if (trace.getStartTime().compareTo(startTimestamp) < 0) {
762 startTimestamp = trace.getStartTime();
763 }
764 if (trace.getStreamingInterval() != 0 && trace.getEndTime().compareTo(endTimestamp) > 0) {
765 endTimestamp = trace.getEndTime();
766 }
767 }
768 if (safeTimestamp != null && safeTimestamp.compareTo(getTimeRange().getEndTime(), false) > 0) {
769 timeRange = new TmfTimeRange(startTimestamp, safeTimestamp);
770 } else {
771 timeRange = null;
772 }
773 safeTimestamp = endTimestamp;
774 if (timeRange != null) {
775 TmfExperimentRangeUpdatedSignal signal =
776 new TmfExperimentRangeUpdatedSignal(TmfExperiment.this, TmfExperiment.this, timeRange);
777 broadcast(signal);
778 }
779 }
780 try {
781 Thread.sleep(getStreamingInterval());
782 } catch (InterruptedException e) {
783 e.printStackTrace();
784 }
785 }
786 }
787 };
788 thread.start();
789 }
790
791 /* (non-Javadoc)
792 * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#getStreamingInterval()
793 */
794 @Override
795 public long getStreamingInterval() {
796 long interval = 0;
797 for (ITmfTrace<T> trace : fTraces) {
798 interval = Math.max(interval, trace.getStreamingInterval());
799 }
800 return interval;
801 }
802
803 /*
804 * The experiment holds the globally ordered events of its set of traces. It is expected to provide access to each
805 * individual event by index i.e. it must be possible to request the Nth event of the experiment.
806 *
807 * The purpose of the index is to keep the information needed to rapidly restore the traces contexts at regular
808 * intervals (every INDEX_PAGE_SIZE event).
809 */
810
811 // The index page size
812 private static final int DEFAULT_INDEX_PAGE_SIZE = 5000;
813 protected int fIndexPageSize;
814 protected boolean fIndexing = false;
815 protected TmfTimeRange fIndexingPendingRange = TmfTimeRange.NULL_RANGE;
816
817 private Integer fEndSynchReference;
818
819 // private static BufferedWriter fEventLog = null;
820 // private static BufferedWriter openLogFile(String filename) {
821 // BufferedWriter outfile = null;
822 // try {
823 // outfile = new BufferedWriter(new FileWriter(filename));
824 // } catch (IOException e) {
825 // e.printStackTrace();
826 // }
827 // return outfile;
828 // }
829
830 protected boolean isIndexingBusy() {
831 synchronized (fCheckpoints) {
832 return fIndexing;
833 }
834 }
835
836 protected void indexExperiment(boolean waitForCompletion) {
837 indexExperiment(waitForCompletion, 0, TmfTimeRange.ETERNITY);
838 }
839
840 @SuppressWarnings("unchecked")
841 protected void indexExperiment(boolean waitForCompletion, final int index, final TmfTimeRange timeRange) {
842
843 synchronized (fCheckpoints) {
844 if (fIndexing) {
845 return;
846 }
847 fIndexing = true;
848 }
849
850 final Job job = new Job("Indexing " + getName() + "...") { //$NON-NLS-1$ //$NON-NLS-2$
851 @Override
852 protected IStatus run(IProgressMonitor monitor) {
853 while (!monitor.isCanceled()) {
854 try {
855 Thread.sleep(100);
856 } catch (InterruptedException e) {
857 return Status.OK_STATUS;
858 }
859 }
860 monitor.done();
861 return Status.OK_STATUS;
862 }
863 };
864 job.schedule();
865
866 // fEventLog = openLogFile("TraceEvent.log");
867 // System.out.println(System.currentTimeMillis() + ": Experiment indexing started");
868
869 ITmfEventRequest<ITmfEvent> request = new TmfEventRequest<ITmfEvent>(ITmfEvent.class, timeRange, index, TmfDataRequest.ALL_DATA,
870 fIndexPageSize, ITmfDataRequest.ExecutionType.BACKGROUND) { // PATA FOREGROUND
871
872 // long indexingStart = System.nanoTime();
873
874 ITmfTimestamp startTime = (fTimeRange == TmfTimeRange.NULL_RANGE) ? null : fTimeRange.getStartTime();
875 ITmfTimestamp lastTime = (fTimeRange == TmfTimeRange.NULL_RANGE) ? null : fTimeRange.getEndTime();
876 long initialNbEvents = fNbEvents;
877
878 @Override
879 public void handleStarted() {
880 super.handleStarted();
881 }
882
883 @Override
884 public void handleData(ITmfEvent event) {
885 super.handleData(event);
886 if (event != null) {
887 ITmfTimestamp ts = event.getTimestamp();
888 if (startTime == null)
889 startTime = ts.clone();
890 lastTime = ts.clone();
891 if ((getNbRead() % fIndexPageSize) == 1 && getNbRead() != 1) {
892 updateExperiment();
893 }
894 }
895 }
896
897 @Override
898 public void handleSuccess() {
899 // long indexingEnd = System.nanoTime();
900
901 // if the end time is a real value then it is the streaming safe time stamp
902 // set the last time to the safe time stamp to prevent unnecessary indexing requests
903 if (getRange().getEndTime() != TmfTimestamp.BIG_CRUNCH) {
904 lastTime = getRange().getEndTime();
905 }
906 updateExperiment();
907 // System.out.println(System.currentTimeMillis() + ": Experiment indexing completed");
908
909 // long average = (indexingEnd - indexingStart) / fNbEvents;
910 // System.out.println(getName() + ": start=" + startTime + ", end=" + lastTime + ", elapsed="
911 // + (indexingEnd * 1.0 - indexingStart) / 1000000000);
912 // System.out.println(getName() + ": nbEvents=" + fNbEvents + " (" + (average / 1000) + "."
913 // + (average % 1000) + " us/evt)");
914 super.handleSuccess();
915 }
916
917 @Override
918 public void handleCompleted() {
919 job.cancel();
920 super.handleCompleted();
921 synchronized (fCheckpoints) {
922 fIndexing = false;
923 if (fIndexingPendingRange != TmfTimeRange.NULL_RANGE) {
924 indexExperiment(false, (int) fNbEvents, fIndexingPendingRange);
925 fIndexingPendingRange = TmfTimeRange.NULL_RANGE;
926 }
927 }
928 }
929
930 private void updateExperiment() {
931 int nbRead = getNbRead();
932 if (startTime != null) {
933 fTimeRange = new TmfTimeRange(startTime, lastTime.clone());
934 }
935 if (nbRead != 0) {
936 // updateTimeRange();
937 // updateNbEvents();
938 fNbEvents = initialNbEvents + nbRead;
939 notifyListeners();
940 }
941 }
942 };
943
944 sendRequest((ITmfDataRequest<T>) request);
945 if (waitForCompletion)
946 try {
947 request.waitForCompletion();
948 } catch (InterruptedException e) {
949 e.printStackTrace();
950 }
951 }
952
953 protected void notifyListeners() {
954 broadcast(new TmfExperimentUpdatedSignal(this, this)); // , null));
955 //broadcast(new TmfExperimentRangeUpdatedSignal(this, this, fTimeRange)); // , null));
956 }
957
958 // ------------------------------------------------------------------------
959 // Signal handlers
960 // ------------------------------------------------------------------------
961
962 @TmfSignalHandler
963 public void experimentSelected(TmfExperimentSelectedSignal<T> signal) {
964 TmfExperiment<?> experiment = signal.getExperiment();
965 if (experiment == this) {
966 setCurrentExperiment(experiment);
967 fEndSynchReference = Integer.valueOf(signal.getReference());
968 }
969 }
970
971 @TmfSignalHandler
972 public void endSync(TmfEndSynchSignal signal) {
973 if (fEndSynchReference != null && fEndSynchReference.intValue() == signal.getReference()) {
974 fEndSynchReference = null;
975 initializeStreamingMonitor();
976 }
977
978 }
979
980 @TmfSignalHandler
981 public void experimentUpdated(TmfExperimentUpdatedSignal signal) {
982 }
983
984 @TmfSignalHandler
985 public void experimentRangeUpdated(TmfExperimentRangeUpdatedSignal signal) {
986 if (signal.getExperiment() == this) {
987 indexExperiment(false, (int) fNbEvents, signal.getRange());
988 }
989 }
990
991 @TmfSignalHandler
992 public void traceUpdated(TmfTraceUpdatedSignal signal) {
993 for (ITmfTrace<T> trace : fTraces) {
994 if (trace == signal.getTrace()) {
995 synchronized (fCheckpoints) {
996 if (fIndexing) {
997 if (fIndexingPendingRange == TmfTimeRange.NULL_RANGE) {
998 fIndexingPendingRange = signal.getRange();
999 } else {
1000 ITmfTimestamp startTime = fIndexingPendingRange.getStartTime();
1001 ITmfTimestamp endTime = fIndexingPendingRange.getEndTime();
1002 if (signal.getRange().getStartTime().compareTo(startTime) < 0) {
1003 startTime = signal.getRange().getStartTime();
1004 }
1005 if (signal.getRange().getEndTime().compareTo(endTime) > 0) {
1006 endTime = signal.getRange().getEndTime();
1007 }
1008 fIndexingPendingRange = new TmfTimeRange(startTime, endTime);
1009 }
1010 return;
1011 }
1012 }
1013 indexExperiment(false, (int) fNbEvents, signal.getRange());
1014 return;
1015 }
1016 }
1017 }
1018
1019 @Override
1020 public String getPath() {
1021 // TODO Auto-generated method stub
1022 return null;
1023 }
1024
1025 /**
1026 * Set the file to be used for bookmarks on this experiment
1027 * @param file the bookmarks file
1028 */
1029 public void setBookmarksFile(IFile file) {
1030 fBookmarksFile = file;
1031 }
1032
1033 /**
1034 * Get the file used for bookmarks on this experiment
1035 * @return the bookmarks file or null if none is set
1036 */
1037 public IFile getBookmarksFile() {
1038 return fBookmarksFile;
1039 }
1040
1041 /* (non-Javadoc)
1042 * @see org.eclipse.linuxtools.tmf.core.trace.ITmfTrace#setResource(org.eclipse.core.resources.IResource)
1043 */
1044 @Override
1045 public void setResource(IResource resource) {
1046 fResource = resource;
1047 }
1048
1049 /* (non-Javadoc)
1050 * @see org.eclipse.linuxtools.tmf.core.trace.ITmfTrace#getResource()
1051 */
1052 @Override
1053 public IResource getResource() {
1054 return fResource;
1055 }
1056 }
This page took 0.068188 seconds and 6 git commands to generate.