Change certain classes to use ITmfEvent instead of TmfEvent.
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf.core / src / org / eclipse / linuxtools / tmf / core / experiment / TmfExperiment.java
1 /*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
12
13 package org.eclipse.linuxtools.tmf.core.experiment;
14
15 import java.util.Collections;
16 import java.util.Vector;
17
18 import org.eclipse.core.resources.IFile;
19 import org.eclipse.core.resources.IProject;
20 import org.eclipse.core.resources.IResource;
21 import org.eclipse.core.runtime.IProgressMonitor;
22 import org.eclipse.core.runtime.IStatus;
23 import org.eclipse.core.runtime.Status;
24 import org.eclipse.core.runtime.jobs.Job;
25 import org.eclipse.linuxtools.tmf.core.component.TmfEventProvider;
26 import org.eclipse.linuxtools.tmf.core.event.ITmfEvent;
27 import org.eclipse.linuxtools.tmf.core.event.ITmfTimestamp;
28 import org.eclipse.linuxtools.tmf.core.event.TmfTimeRange;
29 import org.eclipse.linuxtools.tmf.core.event.TmfTimestamp;
30 import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest;
31 import org.eclipse.linuxtools.tmf.core.request.ITmfEventRequest;
32 import org.eclipse.linuxtools.tmf.core.request.TmfDataRequest;
33 import org.eclipse.linuxtools.tmf.core.request.TmfEventRequest;
34 import org.eclipse.linuxtools.tmf.core.signal.TmfEndSynchSignal;
35 import org.eclipse.linuxtools.tmf.core.signal.TmfExperimentDisposedSignal;
36 import org.eclipse.linuxtools.tmf.core.signal.TmfExperimentRangeUpdatedSignal;
37 import org.eclipse.linuxtools.tmf.core.signal.TmfExperimentSelectedSignal;
38 import org.eclipse.linuxtools.tmf.core.signal.TmfExperimentUpdatedSignal;
39 import org.eclipse.linuxtools.tmf.core.signal.TmfSignalHandler;
40 import org.eclipse.linuxtools.tmf.core.signal.TmfTraceUpdatedSignal;
41 import org.eclipse.linuxtools.tmf.core.trace.ITmfContext;
42 import org.eclipse.linuxtools.tmf.core.trace.ITmfLocation;
43 import org.eclipse.linuxtools.tmf.core.trace.ITmfTrace;
44 import org.eclipse.linuxtools.tmf.core.trace.TmfCheckpoint;
45 import org.eclipse.linuxtools.tmf.core.trace.TmfContext;
46
47 /**
48 * <b><u>TmfExperiment</u></b>
49 * <p>
50 * TmfExperiment presents a time-ordered, unified view of a set of TmfTraces that are part of a tracing experiment.
51 * <p>
52 */
53 public class TmfExperiment<T extends ITmfEvent> extends TmfEventProvider<T> implements ITmfTrace<T> {
54
55 // ------------------------------------------------------------------------
56 // Attributes
57 // ------------------------------------------------------------------------
58
59 // The currently selected experiment
60 protected static TmfExperiment<?> fCurrentExperiment = null;
61
62 // The set of traces that constitute the experiment
63 protected ITmfTrace<T>[] fTraces;
64
65 // The total number of events
66 protected long fNbEvents;
67
68 // The experiment time range
69 protected TmfTimeRange fTimeRange;
70
71 // The experiment reference timestamp (default: ZERO)
72 protected ITmfTimestamp fEpoch;
73
74 // The experiment index
75 protected Vector<TmfCheckpoint> fCheckpoints = new Vector<TmfCheckpoint>();
76
77 // The current experiment context
78 protected TmfExperimentContext fExperimentContext;
79
80 // Flag to initialize only once
81 private boolean fInitialized = false;
82
83 // The experiment bookmarks file
84 private IFile fBookmarksFile;
85
86 // The properties resource
87 private IResource fResource;
88
89 // ------------------------------------------------------------------------
90 // Constructors
91 // ------------------------------------------------------------------------
92
93 @Override
94 public TmfExperiment<T> clone() throws CloneNotSupportedException {
95 throw new CloneNotSupportedException();
96 }
97
98 @Override
99 public boolean validate(IProject project, String path) {
100 return true;
101 }
102
103 @Override
104 public void initTrace(String name, String path, Class<T> eventType) {
105 }
106
107 /**
108 * @param type
109 * @param id
110 * @param traces
111 * @param epoch
112 * @param indexPageSize
113 */
114 public TmfExperiment(Class<T> type, String id, ITmfTrace<T>[] traces, ITmfTimestamp epoch, int indexPageSize) {
115 this(type, id, traces, TmfTimestamp.ZERO, indexPageSize, false);
116 }
117
118 public TmfExperiment(Class<T> type, String id, ITmfTrace<T>[] traces, ITmfTimestamp epoch, int indexPageSize, boolean preIndexExperiment) {
119 super(id, type);
120
121 fTraces = traces;
122 fEpoch = epoch;
123 fIndexPageSize = indexPageSize;
124 fTimeRange = TmfTimeRange.NULL_RANGE;
125
126 if (preIndexExperiment) {
127 indexExperiment(true, 0, TmfTimeRange.ETERNITY);
128 updateTimeRange();
129 }
130 }
131
132 protected TmfExperiment(String id, Class<T> type) {
133 super(id, type);
134 }
135
136 /**
137 * @param type
138 * @param id
139 * @param traces
140 */
141 public TmfExperiment(Class<T> type, String id, ITmfTrace<T>[] traces) {
142 this(type, id, traces, TmfTimestamp.ZERO, DEFAULT_INDEX_PAGE_SIZE);
143 }
144
145 /**
146 * @param type
147 * @param id
148 * @param traces
149 * @param indexPageSize
150 */
151 public TmfExperiment(Class<T> type, String id, ITmfTrace<T>[] traces, int indexPageSize) {
152 this(type, id, traces, TmfTimestamp.ZERO, indexPageSize);
153 }
154
155 /**
156 * Copy constructor
157 *
158 * @param other
159 */
160 @SuppressWarnings("unchecked")
161 public TmfExperiment(TmfExperiment<T> other) {
162 super(other.getName() + "(clone)", other.fType); //$NON-NLS-1$
163
164 fEpoch = other.fEpoch;
165 fIndexPageSize = other.fIndexPageSize;
166
167 fTraces = new ITmfTrace[other.fTraces.length];
168 for (int trace = 0; trace < other.fTraces.length; trace++) {
169 fTraces[trace] = other.fTraces[trace].copy();
170 }
171
172 fNbEvents = other.fNbEvents;
173 fTimeRange = other.fTimeRange;
174 }
175
176 @Override
177 public TmfExperiment<T> copy() {
178 TmfExperiment<T> experiment = new TmfExperiment<T>(this);
179 TmfSignalManager.deregister(experiment);
180 return experiment;
181 }
182
183 /**
184 * Clears the experiment
185 */
186 @Override
187 @SuppressWarnings("rawtypes")
188 public synchronized void dispose() {
189
190 TmfExperimentDisposedSignal<T> signal = new TmfExperimentDisposedSignal<T>(this, this);
191 broadcast(signal);
192 if (fCurrentExperiment == this) {
193 fCurrentExperiment = null;
194 }
195
196 if (fTraces != null) {
197 for (ITmfTrace trace : fTraces) {
198 trace.dispose();
199 }
200 fTraces = null;
201 }
202 if (fCheckpoints != null) {
203 fCheckpoints.clear();
204 }
205 super.dispose();
206 }
207
208 // ------------------------------------------------------------------------
209 // ITmfTrace
210 // ------------------------------------------------------------------------
211
212 @Override
213 public long getNbEvents() {
214 return fNbEvents;
215 }
216
217 @Override
218 public int getIndexPageSize() {
219 return fIndexPageSize;
220 }
221
222 @Override
223 public TmfTimeRange getTimeRange() {
224 return fTimeRange;
225 }
226
227 @Override
228 public ITmfTimestamp getStartTime() {
229 return fTimeRange.getStartTime();
230 }
231
232 @Override
233 public ITmfTimestamp getEndTime() {
234 return fTimeRange.getEndTime();
235 }
236
237 public Vector<TmfCheckpoint> getCheckpoints() {
238 return fCheckpoints;
239 }
240
241 // ------------------------------------------------------------------------
242 // Accessors
243 // ------------------------------------------------------------------------
244
245 public static void setCurrentExperiment(TmfExperiment<?> experiment) {
246 if ((fCurrentExperiment != null) && (fCurrentExperiment != experiment)) {
247 fCurrentExperiment.dispose();
248 }
249 fCurrentExperiment = experiment;
250 }
251
252 public static TmfExperiment<?> getCurrentExperiment() {
253 return fCurrentExperiment;
254 }
255
256 public ITmfTimestamp getEpoch() {
257 return fEpoch;
258 }
259
260 public ITmfTrace<T>[] getTraces() {
261 return fTraces;
262 }
263
264 /**
265 * Returns the rank of the first event with the requested timestamp. If none, returns the index of the next event
266 * (if any).
267 *
268 * @param timestamp the event timestamp
269 * @return the corresponding event rank
270 */
271 @Override
272 public long getRank(ITmfTimestamp timestamp) {
273 TmfExperimentContext context = seekEvent(timestamp);
274 return context.getRank();
275 }
276
277 /**
278 * Returns the timestamp of the event at the requested index. If none, returns null.
279 *
280 * @param index the event index (rank)
281 * @return the corresponding event timestamp
282 */
283 public ITmfTimestamp getTimestamp(int index) {
284 TmfExperimentContext context = seekEvent(index);
285 ITmfEvent event = getNextEvent(context);
286 return (event != null) ? event.getTimestamp() : null;
287 }
288
289 // ------------------------------------------------------------------------
290 // Operators
291 // ------------------------------------------------------------------------
292
293 /**
294 * Update the global time range
295 */
296 protected void updateTimeRange() {
297 ITmfTimestamp startTime = fTimeRange != TmfTimeRange.NULL_RANGE ? fTimeRange.getStartTime() : TmfTimestamp.BIG_CRUNCH;
298 ITmfTimestamp endTime = fTimeRange != TmfTimeRange.NULL_RANGE ? fTimeRange.getEndTime() : TmfTimestamp.BIG_BANG;
299
300 for (ITmfTrace<T> trace : fTraces) {
301 ITmfTimestamp traceStartTime = trace.getStartTime();
302 if (traceStartTime.compareTo(startTime, true) < 0) {
303 startTime = traceStartTime;
304 }
305 ITmfTimestamp traceEndTime = trace.getEndTime();
306 if (traceEndTime.compareTo(endTime, true) > 0) {
307 endTime = traceEndTime;
308 }
309 }
310 fTimeRange = new TmfTimeRange(startTime, endTime);
311 }
312
313 // ------------------------------------------------------------------------
314 // TmfProvider
315 // ------------------------------------------------------------------------
316 @Override
317 public ITmfContext armRequest(ITmfDataRequest<T> request) {
318 // Tracer.trace("Ctx: Arming request - start");
319 ITmfTimestamp timestamp = (request instanceof ITmfEventRequest<?>) ? ((ITmfEventRequest<T>) request).getRange().getStartTime()
320 : null;
321
322 if (TmfTimestamp.BIG_BANG.equals(timestamp) || (request.getIndex() > 0)) {
323 timestamp = null; // use request index
324 }
325
326 TmfExperimentContext context = null;
327 if (timestamp != null) {
328 // seek by timestamp
329 context = seekEvent(timestamp);
330 ((ITmfEventRequest<T>) request).setStartIndex((int) context.getRank());
331 } else {
332 // Seek by rank
333 if ((fExperimentContext != null) && (fExperimentContext.getRank() == request.getIndex())) {
334 // We are already at the right context -> no need to seek
335 context = fExperimentContext;
336 } else {
337 context = seekEvent(request.getIndex());
338 }
339 }
340 // Tracer.trace("Ctx: Arming request - done");
341 return context;
342 }
343
344 @SuppressWarnings("unchecked")
345 @Override
346 public T getNext(ITmfContext context) {
347 if (context instanceof TmfExperimentContext) {
348 return (T) getNextEvent(context);
349 }
350 return null;
351 }
352
353 // ------------------------------------------------------------------------
354 // ITmfTrace trace positioning
355 // ------------------------------------------------------------------------
356
357 // Returns a brand new context based on the location provided
358 // and initializes the event queues
359 @Override
360 public synchronized TmfExperimentContext seekLocation(ITmfLocation<?> location) {
361 // Validate the location
362 if ((location != null) && !(location instanceof TmfExperimentLocation)) {
363 return null; // Throw an exception?
364 }
365
366 if (fTraces == null) { // experiment has been disposed
367 return null;
368 }
369
370 // Instantiate the location
371 TmfExperimentLocation expLocation = (location == null) ? new TmfExperimentLocation(new TmfLocationArray(
372 new ITmfLocation<?>[fTraces.length]), new long[fTraces.length]) : (TmfExperimentLocation) location.clone();
373
374 // Create and populate the context's traces contexts
375 TmfExperimentContext context = new TmfExperimentContext(fTraces, new ITmfContext[fTraces.length]);
376 // Tracer.trace("Ctx: SeekLocation - start");
377
378 long rank = 0;
379 for (int i = 0; i < fTraces.length; i++) {
380 // Get the relevant trace attributes
381 ITmfLocation<?> traceLocation = expLocation.getLocation().locations[i];
382 long traceRank = expLocation.getRanks()[i];
383
384 // Set the corresponding sub-context
385 context.getContexts()[i] = fTraces[i].seekLocation(traceLocation);
386 context.getContexts()[i].setRank(traceRank);
387 rank += traceRank;
388
389 // Set the trace location and read the corresponding event
390 /* The (TmfContext) cast should be safe since we created 'context'
391 * ourselves higher up. */
392 expLocation.getLocation().locations[i] = (ITmfLocation<? extends Comparable<?>>) (context.getContexts()[i]).getLocation().clone();
393 context.getEvents()[i] = fTraces[i].getNextEvent(context.getContexts()[i]);
394 }
395
396 // Tracer.trace("Ctx: SeekLocation - done");
397
398 // Finalize context
399 context.setLocation(expLocation);
400 context.setLastTrace(TmfExperimentContext.NO_TRACE);
401 context.setRank(rank);
402
403 fExperimentContext = context;
404
405 return context;
406 }
407
408 /*
409 * (non-Javadoc)
410 *
411 * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#seekEvent(org.eclipse.linuxtools .tmf.event.TmfTimestamp)
412 */
413 @Override
414 public synchronized TmfExperimentContext seekEvent(ITmfTimestamp timestamp) {
415
416 // Tracer.trace("Ctx: seekEvent(TS) - start");
417
418 if (timestamp == null) {
419 timestamp = TmfTimestamp.BIG_BANG;
420 }
421
422 // First, find the right checkpoint
423 int index = Collections.binarySearch(fCheckpoints, new TmfCheckpoint(timestamp, null));
424
425 // In the very likely case that the checkpoint was not found, bsearch
426 // returns its negated would-be location (not an offset...). From that
427 // index, we can then position the stream and get the event.
428 if (index < 0) {
429 index = Math.max(0, -(index + 2));
430 }
431
432 // Position the experiment at the checkpoint
433 ITmfLocation<?> location;
434 synchronized (fCheckpoints) {
435 if (fCheckpoints.size() > 0) {
436 if (index >= fCheckpoints.size()) {
437 index = fCheckpoints.size() - 1;
438 }
439 location = fCheckpoints.elementAt(index).getLocation();
440 } else {
441 location = null;
442 }
443 }
444
445 TmfExperimentContext context = seekLocation(location);
446 context.setRank((long) index * fIndexPageSize);
447
448 // And locate the event
449 ITmfEvent event = parseEvent(context);
450 while ((event != null) && (event.getTimestamp().compareTo(timestamp, false) < 0)) {
451 getNextEvent(context);
452 event = parseEvent(context);
453 }
454
455 if (event == null) {
456 context.setLocation(null);
457 context.setRank(ITmfContext.UNKNOWN_RANK);
458 }
459
460 return context;
461 }
462
463 /*
464 * (non-Javadoc)
465 *
466 * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#seekEvent(long)
467 */
468 @Override
469 public synchronized TmfExperimentContext seekEvent(long rank) {
470
471 // Tracer.trace("Ctx: seekEvent(rank) - start");
472
473 // Position the stream at the previous checkpoint
474 int index = (int) rank / fIndexPageSize;
475 ITmfLocation<?> location;
476 synchronized (fCheckpoints) {
477 if (fCheckpoints.size() == 0) {
478 location = null;
479 } else {
480 if (index >= fCheckpoints.size()) {
481 index = fCheckpoints.size() - 1;
482 }
483 location = fCheckpoints.elementAt(index).getLocation();
484 }
485 }
486
487 TmfExperimentContext context = seekLocation(location);
488 context.setRank((long) index * fIndexPageSize);
489
490 // And locate the event
491 ITmfEvent event = parseEvent(context);
492 long pos = context.getRank();
493 while ((event != null) && (pos++ < rank)) {
494 getNextEvent(context);
495 event = parseEvent(context);
496 }
497
498 if (event == null) {
499 context.setLocation(null);
500 context.setRank(ITmfContext.UNKNOWN_RANK);
501 }
502
503 return context;
504 }
505
506 @Override
507 public TmfContext seekLocation(double ratio) {
508 TmfContext context = seekEvent((long) (ratio * getNbEvents()));
509 return context;
510 }
511
512 @Override
513 public double getLocationRatio(ITmfLocation<?> location) {
514 if (location instanceof TmfExperimentLocation) {
515 return (double) seekLocation(location).getRank() / getNbEvents();
516 }
517 return 0;
518 }
519
520 @Override
521 public ITmfLocation<?> getCurrentLocation() {
522 if (fExperimentContext != null) {
523 return fExperimentContext.getLocation();
524 }
525 return null;
526 }
527
528 // private void dumpContext(TmfExperimentContext context, boolean isBefore) {
529
530 // TmfContext context0 = context.getContexts()[0];
531 // TmfEvent event0 = context.getEvents()[0];
532 // TmfExperimentLocation location0 = (TmfExperimentLocation) context.getLocation();
533 // long rank0 = context.getRank();
534 // int trace = context.getLastTrace();
535 //
536 // StringBuffer result = new StringBuffer("Ctx: " + (isBefore ? "B " : "A "));
537 //
538 // result.append("[Ctx: fLoc= " + context0.getLocation().toString() + ", fRnk= " + context0.getRank() + "] ");
539 // result.append("[Evt: " + event0.getTimestamp().toString() + "] ");
540 // result.append("[Loc: fLoc= " + location0.getLocation()[0].toString() + ", fRnk= " + location0.getRanks()[0] + "] ");
541 // result.append("[Rnk: " + rank0 + "], [Trc: " + trace + "]");
542 // Tracer.trace(result.toString());
543 // }
544
545 /**
546 * Scan the next events from all traces and return the next one in chronological order.
547 *
548 * @param context the trace context
549 * @return the next event
550 */
551 @SuppressWarnings("unchecked")
552 @Override
553 public synchronized ITmfEvent getNextEvent(ITmfContext context) {
554
555 // Validate the context
556 if (!(context instanceof TmfExperimentContext)) {
557 return null; // Throw an exception?
558 }
559
560 if (!context.equals(fExperimentContext)) {
561 // Tracer.trace("Ctx: Restoring context");
562 fExperimentContext = seekLocation(context.getLocation());
563 }
564
565 TmfExperimentContext expContext = (TmfExperimentContext) context;
566
567 // dumpContext(expContext, true);
568
569 // If an event was consumed previously, get the next one from that trace
570 int lastTrace = expContext.getLastTrace();
571 if (lastTrace != TmfExperimentContext.NO_TRACE) {
572 ITmfContext traceContext = expContext.getContexts()[lastTrace];
573 expContext.getEvents()[lastTrace] = expContext.getTraces()[lastTrace].getNextEvent(traceContext);
574 expContext.setLastTrace(TmfExperimentContext.NO_TRACE);
575 }
576
577 // Scan the candidate events and identify the "next" trace to read from
578 ITmfEvent eventArray[] = expContext.getEvents();
579 if (eventArray == null) {
580 return null;
581 }
582 int trace = TmfExperimentContext.NO_TRACE;
583 ITmfTimestamp timestamp = TmfTimestamp.BIG_CRUNCH;
584 if (eventArray.length == 1) {
585 if (eventArray[0] != null) {
586 timestamp = eventArray[0].getTimestamp();
587 trace = 0;
588 }
589 } else {
590 for (int i = 0; i < eventArray.length; i++) {
591 ITmfEvent event = eventArray[i];
592 if ((event != null) && (event.getTimestamp() != null)) {
593 ITmfTimestamp otherTS = event.getTimestamp();
594 if (otherTS.compareTo(timestamp, true) < 0) {
595 trace = i;
596 timestamp = otherTS;
597 }
598 }
599 }
600 }
601 // Update the experiment context and set the "next" event
602 ITmfEvent event = null;
603 if (trace != TmfExperimentContext.NO_TRACE) {
604 updateIndex(expContext, timestamp);
605
606 ITmfContext traceContext = expContext.getContexts()[trace];
607 TmfExperimentLocation expLocation = (TmfExperimentLocation) expContext.getLocation();
608 // expLocation.getLocation()[trace] = traceContext.getLocation().clone();
609 expLocation.getLocation().locations[trace] = (ITmfLocation<? extends Comparable<?>>) traceContext.getLocation().clone();
610
611 // updateIndex(expContext, timestamp);
612
613 expLocation.getRanks()[trace] = traceContext.getRank();
614 expContext.setLastTrace(trace);
615 expContext.updateRank(1);
616 event = expContext.getEvents()[trace];
617 fExperimentContext = expContext;
618 }
619
620 // if (event != null) {
621 // Tracer.trace("Exp: " + (expContext.getRank() - 1) + ": " + event.getTimestamp().toString());
622 // dumpContext(expContext, false);
623 // Tracer.trace("Ctx: Event returned= " + event.getTimestamp().toString());
624 // }
625
626 return event;
627 }
628
629 public synchronized void updateIndex(ITmfContext context, ITmfTimestamp timestamp) {
630 // Build the index as we go along
631 long rank = context.getRank();
632 if (context.isValidRank() && ((rank % fIndexPageSize) == 0)) {
633 // Determine the table position
634 long position = rank / fIndexPageSize;
635 // Add new entry at proper location (if empty)
636 if (fCheckpoints.size() == position) {
637 ITmfLocation<?> location = context.getLocation().clone();
638 fCheckpoints.add(new TmfCheckpoint(timestamp.clone(), location));
639 // System.out.println(this + "[" + (fCheckpoints.size() - 1) + "] " + timestamp + ", "
640 // + location.toString());
641 }
642 }
643 }
644
645 /*
646 * (non-Javadoc)
647 *
648 * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#parseEvent(org.eclipse.linuxtools .tmf.trace.TmfContext)
649 */
650 @Override
651 public ITmfEvent parseEvent(ITmfContext context) {
652
653 // Validate the context
654 if (!(context instanceof TmfExperimentContext)) {
655 return null; // Throw an exception?
656 }
657
658 if (!context.equals(fExperimentContext)) {
659 // Tracer.trace("Ctx: Restoring context");
660 seekLocation(context.getLocation());
661 }
662
663 TmfExperimentContext expContext = (TmfExperimentContext) context;
664
665 // If an event was consumed previously, get the next one from that trace
666 int lastTrace = expContext.getLastTrace();
667 if (lastTrace != TmfExperimentContext.NO_TRACE) {
668 ITmfContext traceContext = expContext.getContexts()[lastTrace];
669 expContext.getEvents()[lastTrace] = expContext.getTraces()[lastTrace].getNextEvent(traceContext);
670 expContext.setLastTrace(TmfExperimentContext.NO_TRACE);
671 fExperimentContext = (TmfExperimentContext) context;
672 }
673
674 // Scan the candidate events and identify the "next" trace to read from
675 int trace = TmfExperimentContext.NO_TRACE;
676 ITmfTimestamp timestamp = TmfTimestamp.BIG_CRUNCH;
677 for (int i = 0; i < expContext.getTraces().length; i++) {
678 ITmfEvent event = expContext.getEvents()[i];
679 if ((event != null) && (event.getTimestamp() != null)) {
680 ITmfTimestamp otherTS = event.getTimestamp();
681 if (otherTS.compareTo(timestamp, true) < 0) {
682 trace = i;
683 timestamp = otherTS;
684 }
685 }
686 }
687
688 ITmfEvent event = null;
689 if (trace != TmfExperimentContext.NO_TRACE) {
690 event = expContext.getEvents()[trace];
691 }
692
693 return event;
694 }
695
696 /*
697 * (non-Javadoc)
698 *
699 * @see java.lang.Object#toString()
700 */
701 @Override
702 @SuppressWarnings("nls")
703 public String toString() {
704 return "[TmfExperiment (" + getName() + ")]";
705 }
706
707 // ------------------------------------------------------------------------
708 // Indexing
709 // ------------------------------------------------------------------------
710
711 private synchronized void initializeStreamingMonitor() {
712 if (fInitialized) {
713 return;
714 }
715 fInitialized = true;
716
717 if (getStreamingInterval() == 0) {
718 TmfContext context = seekLocation(null);
719 ITmfEvent event = getNext(context);
720 if (event == null) {
721 return;
722 }
723 TmfTimeRange timeRange = new TmfTimeRange(event.getTimestamp().clone(), TmfTimestamp.BIG_CRUNCH);
724 final TmfExperimentRangeUpdatedSignal signal = new TmfExperimentRangeUpdatedSignal(this, this, timeRange);
725
726 // Broadcast in separate thread to prevent deadlock
727 new Thread() {
728 @Override
729 public void run() {
730 broadcast(signal);
731 }
732 }.start();
733 return;
734 }
735
736 final Thread thread = new Thread("Streaming Monitor for experiment " + getName()) { //$NON-NLS-1$
737 ITmfTimestamp safeTimestamp = null;
738 TmfTimeRange timeRange = null;
739
740 @Override
741 public void run() {
742 while (!fExecutor.isShutdown()) {
743 if (!isIndexingBusy()) {
744 ITmfTimestamp startTimestamp = TmfTimestamp.BIG_CRUNCH;
745 ITmfTimestamp endTimestamp = TmfTimestamp.BIG_BANG;
746 for (ITmfTrace<T> trace : fTraces) {
747 if (trace.getStartTime().compareTo(startTimestamp) < 0) {
748 startTimestamp = trace.getStartTime();
749 }
750 if ((trace.getStreamingInterval() != 0) && (trace.getEndTime().compareTo(endTimestamp) > 0)) {
751 endTimestamp = trace.getEndTime();
752 }
753 }
754 if ((safeTimestamp != null) && (safeTimestamp.compareTo(getTimeRange().getEndTime(), false) > 0)) {
755 timeRange = new TmfTimeRange(startTimestamp, safeTimestamp);
756 } else {
757 timeRange = null;
758 }
759 safeTimestamp = endTimestamp;
760 if (timeRange != null) {
761 TmfExperimentRangeUpdatedSignal signal =
762 new TmfExperimentRangeUpdatedSignal(TmfExperiment.this, TmfExperiment.this, timeRange);
763 broadcast(signal);
764 }
765 }
766 try {
767 Thread.sleep(getStreamingInterval());
768 } catch (InterruptedException e) {
769 e.printStackTrace();
770 }
771 }
772 }
773 };
774 thread.start();
775 }
776
777 /* (non-Javadoc)
778 * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#getStreamingInterval()
779 */
780 @Override
781 public long getStreamingInterval() {
782 long interval = 0;
783 for (ITmfTrace<T> trace : fTraces) {
784 interval = Math.max(interval, trace.getStreamingInterval());
785 }
786 return interval;
787 }
788
789 /*
790 * The experiment holds the globally ordered events of its set of traces. It is expected to provide access to each
791 * individual event by index i.e. it must be possible to request the Nth event of the experiment.
792 *
793 * The purpose of the index is to keep the information needed to rapidly restore the traces contexts at regular
794 * intervals (every INDEX_PAGE_SIZE event).
795 */
796
797 // The index page size
798 private static final int DEFAULT_INDEX_PAGE_SIZE = 5000;
799 protected int fIndexPageSize;
800 protected boolean fIndexing = false;
801 protected TmfTimeRange fIndexingPendingRange = TmfTimeRange.NULL_RANGE;
802
803 private Integer fEndSynchReference;
804
805 // private static BufferedWriter fEventLog = null;
806 // private static BufferedWriter openLogFile(String filename) {
807 // BufferedWriter outfile = null;
808 // try {
809 // outfile = new BufferedWriter(new FileWriter(filename));
810 // } catch (IOException e) {
811 // e.printStackTrace();
812 // }
813 // return outfile;
814 // }
815
816 protected boolean isIndexingBusy() {
817 synchronized (fCheckpoints) {
818 return fIndexing;
819 }
820 }
821
822 @Override
823 public void indexTrace(boolean waitForCompletion) {
824 if (waitForCompletion) {
825 initializeStreamingMonitor();
826 }
827 }
828
829 @SuppressWarnings("unchecked")
830 private void indexExperiment(boolean waitForCompletion, final int index, final TmfTimeRange timeRange) {
831
832 synchronized (fCheckpoints) {
833 if (fIndexing) {
834 return;
835 }
836 fIndexing = true;
837 }
838
839 final Job job = new Job("Indexing " + getName() + "...") { //$NON-NLS-1$ //$NON-NLS-2$
840 @Override
841 protected IStatus run(IProgressMonitor monitor) {
842 while (!monitor.isCanceled()) {
843 try {
844 Thread.sleep(100);
845 } catch (InterruptedException e) {
846 return Status.OK_STATUS;
847 }
848 }
849 monitor.done();
850 return Status.OK_STATUS;
851 }
852 };
853 job.schedule();
854
855 // fEventLog = openLogFile("TraceEvent.log");
856 // System.out.println(System.currentTimeMillis() + ": Experiment indexing started");
857
858 ITmfEventRequest<ITmfEvent> request = new TmfEventRequest<ITmfEvent>(ITmfEvent.class, timeRange, index, TmfDataRequest.ALL_DATA,
859 fIndexPageSize, ITmfDataRequest.ExecutionType.BACKGROUND) { // PATA FOREGROUND
860
861 // long indexingStart = System.nanoTime();
862
863 ITmfTimestamp startTime = (fTimeRange == TmfTimeRange.NULL_RANGE) ? null : fTimeRange.getStartTime();
864 ITmfTimestamp lastTime = (fTimeRange == TmfTimeRange.NULL_RANGE) ? null : fTimeRange.getEndTime();
865 long initialNbEvents = fNbEvents;
866
867 @Override
868 public void handleStarted() {
869 super.handleStarted();
870 }
871
872 @Override
873 public void handleData(ITmfEvent event) {
874 super.handleData(event);
875 if (event != null) {
876 ITmfTimestamp ts = event.getTimestamp();
877 if (startTime == null) {
878 startTime = ts.clone();
879 }
880 lastTime = ts.clone();
881 if (((getNbRead() % fIndexPageSize) == 1) && (getNbRead() != 1)) {
882 updateExperiment();
883 }
884 }
885 }
886
887 @Override
888 public void handleSuccess() {
889 // long indexingEnd = System.nanoTime();
890
891 // if the end time is a real value then it is the streaming safe time stamp
892 // set the last time to the safe time stamp to prevent unnecessary indexing requests
893 if (getRange().getEndTime() != TmfTimestamp.BIG_CRUNCH) {
894 lastTime = getRange().getEndTime();
895 }
896 updateExperiment();
897 // System.out.println(System.currentTimeMillis() + ": Experiment indexing completed");
898
899 // long average = (indexingEnd - indexingStart) / fNbEvents;
900 // System.out.println(getName() + ": start=" + startTime + ", end=" + lastTime + ", elapsed="
901 // + (indexingEnd * 1.0 - indexingStart) / 1000000000);
902 // System.out.println(getName() + ": nbEvents=" + fNbEvents + " (" + (average / 1000) + "."
903 // + (average % 1000) + " us/evt)");
904 super.handleSuccess();
905 }
906
907 @Override
908 public void handleCompleted() {
909 job.cancel();
910 super.handleCompleted();
911 synchronized (fCheckpoints) {
912 fIndexing = false;
913 if (fIndexingPendingRange != TmfTimeRange.NULL_RANGE) {
914 indexExperiment(false, (int) fNbEvents, fIndexingPendingRange);
915 fIndexingPendingRange = TmfTimeRange.NULL_RANGE;
916 }
917 }
918 }
919
920 private void updateExperiment() {
921 int nbRead = getNbRead();
922 if (startTime != null) {
923 fTimeRange = new TmfTimeRange(startTime, lastTime.clone());
924 }
925 if (nbRead != 0) {
926 // updateTimeRange();
927 // updateNbEvents();
928 fNbEvents = initialNbEvents + nbRead;
929 notifyListeners();
930 }
931 }
932 };
933
934 sendRequest((ITmfDataRequest<T>) request);
935 if (waitForCompletion) {
936 try {
937 request.waitForCompletion();
938 } catch (InterruptedException e) {
939 e.printStackTrace();
940 }
941 }
942 }
943
944 protected void notifyListeners() {
945 broadcast(new TmfExperimentUpdatedSignal(this, this)); // , null));
946 //broadcast(new TmfExperimentRangeUpdatedSignal(this, this, fTimeRange)); // , null));
947 }
948
949 // ------------------------------------------------------------------------
950 // Signal handlers
951 // ------------------------------------------------------------------------
952
953 @TmfSignalHandler
954 public void experimentSelected(TmfExperimentSelectedSignal<T> signal) {
955 TmfExperiment<?> experiment = signal.getExperiment();
956 if (experiment == this) {
957 setCurrentExperiment(experiment);
958 fEndSynchReference = Integer.valueOf(signal.getReference());
959 }
960 }
961
962 @TmfSignalHandler
963 public void endSync(TmfEndSynchSignal signal) {
964 if ((fEndSynchReference != null) && (fEndSynchReference.intValue() == signal.getReference())) {
965 fEndSynchReference = null;
966 initializeStreamingMonitor();
967 }
968
969 }
970
971 @TmfSignalHandler
972 public void experimentUpdated(TmfExperimentUpdatedSignal signal) {
973 }
974
975 @TmfSignalHandler
976 public void experimentRangeUpdated(TmfExperimentRangeUpdatedSignal signal) {
977 if (signal.getExperiment() == this) {
978 indexExperiment(false, (int) fNbEvents, signal.getRange());
979 }
980 }
981
982 @TmfSignalHandler
983 public void traceUpdated(TmfTraceUpdatedSignal signal) {
984 for (ITmfTrace<T> trace : fTraces) {
985 if (trace == signal.getTrace()) {
986 synchronized (fCheckpoints) {
987 if (fIndexing) {
988 if (fIndexingPendingRange == TmfTimeRange.NULL_RANGE) {
989 fIndexingPendingRange = signal.getRange();
990 } else {
991 ITmfTimestamp startTime = fIndexingPendingRange.getStartTime();
992 ITmfTimestamp endTime = fIndexingPendingRange.getEndTime();
993 if (signal.getRange().getStartTime().compareTo(startTime) < 0) {
994 startTime = signal.getRange().getStartTime();
995 }
996 if (signal.getRange().getEndTime().compareTo(endTime) > 0) {
997 endTime = signal.getRange().getEndTime();
998 }
999 fIndexingPendingRange = new TmfTimeRange(startTime, endTime);
1000 }
1001 return;
1002 }
1003 }
1004 indexExperiment(false, (int) fNbEvents, signal.getRange());
1005 return;
1006 }
1007 }
1008 }
1009
1010 @Override
1011 public String getPath() {
1012 // TODO Auto-generated method stub
1013 return null;
1014 }
1015
1016 /**
1017 * Set the file to be used for bookmarks on this experiment
1018 * @param file the bookmarks file
1019 */
1020 public void setBookmarksFile(IFile file) {
1021 fBookmarksFile = file;
1022 }
1023
1024 /**
1025 * Get the file used for bookmarks on this experiment
1026 * @return the bookmarks file or null if none is set
1027 */
1028 public IFile getBookmarksFile() {
1029 return fBookmarksFile;
1030 }
1031
1032 /* (non-Javadoc)
1033 * @see org.eclipse.linuxtools.tmf.core.trace.ITmfTrace#setResource(org.eclipse.core.resources.IResource)
1034 */
1035 @Override
1036 public void setResource(IResource resource) {
1037 fResource = resource;
1038 }
1039
1040 /* (non-Javadoc)
1041 * @see org.eclipse.linuxtools.tmf.core.trace.ITmfTrace#getResource()
1042 */
1043 @Override
1044 public IResource getResource() {
1045 return fResource;
1046 }
1047 }
This page took 0.055441 seconds and 6 git commands to generate.