Refactor TmfTrace and dependencies - introduce ITmfTraceIndexer
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf.core / src / org / eclipse / linuxtools / tmf / core / experiment / TmfExperiment.java
1 /*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
12
13 package org.eclipse.linuxtools.tmf.core.experiment;
14
15 import java.util.Collections;
16 import java.util.Vector;
17
18 import org.eclipse.core.resources.IFile;
19 import org.eclipse.core.resources.IProject;
20 import org.eclipse.core.resources.IResource;
21 import org.eclipse.core.runtime.IProgressMonitor;
22 import org.eclipse.core.runtime.IStatus;
23 import org.eclipse.core.runtime.Status;
24 import org.eclipse.core.runtime.jobs.Job;
25 import org.eclipse.linuxtools.tmf.core.component.TmfEventProvider;
26 import org.eclipse.linuxtools.tmf.core.event.ITmfEvent;
27 import org.eclipse.linuxtools.tmf.core.event.ITmfTimestamp;
28 import org.eclipse.linuxtools.tmf.core.event.TmfTimeRange;
29 import org.eclipse.linuxtools.tmf.core.event.TmfTimestamp;
30 import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest;
31 import org.eclipse.linuxtools.tmf.core.request.ITmfEventRequest;
32 import org.eclipse.linuxtools.tmf.core.request.TmfDataRequest;
33 import org.eclipse.linuxtools.tmf.core.request.TmfEventRequest;
34 import org.eclipse.linuxtools.tmf.core.signal.TmfEndSynchSignal;
35 import org.eclipse.linuxtools.tmf.core.signal.TmfExperimentDisposedSignal;
36 import org.eclipse.linuxtools.tmf.core.signal.TmfExperimentRangeUpdatedSignal;
37 import org.eclipse.linuxtools.tmf.core.signal.TmfExperimentSelectedSignal;
38 import org.eclipse.linuxtools.tmf.core.signal.TmfExperimentUpdatedSignal;
39 import org.eclipse.linuxtools.tmf.core.signal.TmfSignalHandler;
40 import org.eclipse.linuxtools.tmf.core.signal.TmfTraceUpdatedSignal;
41 import org.eclipse.linuxtools.tmf.core.trace.ITmfContext;
42 import org.eclipse.linuxtools.tmf.core.trace.ITmfLocation;
43 import org.eclipse.linuxtools.tmf.core.trace.ITmfTrace;
44 import org.eclipse.linuxtools.tmf.core.trace.TmfCheckpoint;
45 import org.eclipse.linuxtools.tmf.core.trace.TmfContext;
46
47 /**
48 * <b><u>TmfExperiment</u></b>
49 * <p>
50 * TmfExperiment presents a time-ordered, unified view of a set of TmfTraces
51 * that are part of a tracing experiment.
52 * <p>
53 */
54 public class TmfExperiment<T extends ITmfEvent> extends TmfEventProvider<T> implements ITmfTrace<T> {
55
56 // ------------------------------------------------------------------------
57 // Attributes
58 // ------------------------------------------------------------------------
59
60 // The currently selected experiment
61 protected static TmfExperiment<?> fCurrentExperiment = null;
62
63 // The set of traces that constitute the experiment
64 protected ITmfTrace<T>[] fTraces;
65
66 // The total number of events
67 protected long fNbEvents;
68
69 // The experiment time range
70 protected TmfTimeRange fTimeRange;
71
72 // The experiment reference timestamp (default: ZERO)
73 protected ITmfTimestamp fEpoch;
74
75 // The experiment index
76 protected Vector<TmfCheckpoint> fCheckpoints = new Vector<TmfCheckpoint>();
77
78 // The current experiment context
79 protected TmfExperimentContext fExperimentContext;
80
81 // Flag to initialize only once
82 private boolean fInitialized = false;
83
84 // The experiment bookmarks file
85 private IFile fBookmarksFile;
86
87 // The properties resource
88 private IResource fResource;
89
90 // ------------------------------------------------------------------------
91 // Constructors
92 // ------------------------------------------------------------------------
93
94 @Override
95 public TmfExperiment<T> clone() throws CloneNotSupportedException {
96 throw new CloneNotSupportedException();
97 }
98
99 @Override
100 public boolean validate(final IProject project, final String path) {
101 return true;
102 }
103
104 @Override
105 public void initTrace(final IResource resource, final String path, final Class<T> eventType) {
106 fResource = resource;
107 }
108
109 /**
110 * @param type
111 * @param id
112 * @param traces
113 * @param epoch
114 * @param indexPageSize
115 */
116 public TmfExperiment(final Class<T> type, final String id, final ITmfTrace<T>[] traces, final ITmfTimestamp epoch,
117 final int indexPageSize) {
118 this(type, id, traces, TmfTimestamp.ZERO, indexPageSize, false);
119 }
120
121 public TmfExperiment(final Class<T> type, final String id, final ITmfTrace<T>[] traces, final ITmfTimestamp epoch,
122 final int indexPageSize, final boolean preIndexExperiment) {
123 super(id, type);
124
125 fTraces = traces;
126 fEpoch = epoch;
127 fIndexPageSize = indexPageSize;
128 fTimeRange = TmfTimeRange.NULL_RANGE;
129
130 if (preIndexExperiment) {
131 indexExperiment(true, 0, TmfTimeRange.ETERNITY);
132 updateTimeRange();
133 }
134 }
135
136 protected TmfExperiment(final String id, final Class<T> type) {
137 super(id, type);
138 }
139
140 /**
141 * @param type
142 * @param id
143 * @param traces
144 */
145 public TmfExperiment(final Class<T> type, final String id, final ITmfTrace<T>[] traces) {
146 this(type, id, traces, TmfTimestamp.ZERO, DEFAULT_INDEX_PAGE_SIZE);
147 }
148
149 /**
150 * @param type
151 * @param id
152 * @param traces
153 * @param indexPageSize
154 */
155 public TmfExperiment(final Class<T> type, final String id, final ITmfTrace<T>[] traces, final int indexPageSize) {
156 this(type, id, traces, TmfTimestamp.ZERO, indexPageSize);
157 }
158
159 /**
160 * Clears the experiment
161 */
162 @Override
163 @SuppressWarnings("rawtypes")
164 public synchronized void dispose() {
165
166 final TmfExperimentDisposedSignal<T> signal = new TmfExperimentDisposedSignal<T>(this, this);
167 broadcast(signal);
168 if (fCurrentExperiment == this)
169 fCurrentExperiment = null;
170
171 if (fTraces != null) {
172 for (final ITmfTrace trace : fTraces)
173 trace.dispose();
174 fTraces = null;
175 }
176 if (fCheckpoints != null)
177 fCheckpoints.clear();
178 super.dispose();
179 }
180
181 // ------------------------------------------------------------------------
182 // ITmfTrace
183 // ------------------------------------------------------------------------
184
185 @Override
186 public Class<T> getType() {
187 return fType;
188 }
189
190 @Override
191 public long getNbEvents() {
192 return fNbEvents;
193 }
194
195 @Override
196 public int getCacheSize() {
197 return fIndexPageSize;
198 }
199
200 @Override
201 public TmfTimeRange getTimeRange() {
202 return fTimeRange;
203 }
204
205 @Override
206 public ITmfTimestamp getStartTime() {
207 return fTimeRange.getStartTime();
208 }
209
210 @Override
211 public ITmfTimestamp getEndTime() {
212 return fTimeRange.getEndTime();
213 }
214
215 public Vector<TmfCheckpoint> getCheckpoints() {
216 return fCheckpoints;
217 }
218
219 // ------------------------------------------------------------------------
220 // Accessors
221 // ------------------------------------------------------------------------
222
223 public static void setCurrentExperiment(final TmfExperiment<?> experiment) {
224 if (fCurrentExperiment != null && fCurrentExperiment != experiment)
225 fCurrentExperiment.dispose();
226 fCurrentExperiment = experiment;
227 }
228
229 public static TmfExperiment<?> getCurrentExperiment() {
230 return fCurrentExperiment;
231 }
232
233 public ITmfTimestamp getEpoch() {
234 return fEpoch;
235 }
236
237 public ITmfTrace<T>[] getTraces() {
238 return fTraces;
239 }
240
241 /**
242 * Returns the timestamp of the event at the requested index. If none,
243 * returns null.
244 *
245 * @param index the event index (rank)
246 * @return the corresponding event timestamp
247 */
248 public ITmfTimestamp getTimestamp(final int index) {
249 final TmfExperimentContext context = seekEvent(index);
250 final ITmfEvent event = getNextEvent(context);
251 return (event != null) ? event.getTimestamp() : null;
252 }
253
254 // ------------------------------------------------------------------------
255 // Operators
256 // ------------------------------------------------------------------------
257
258 /**
259 * Update the global time range
260 */
261 protected void updateTimeRange() {
262 ITmfTimestamp startTime = fTimeRange != TmfTimeRange.NULL_RANGE ? fTimeRange.getStartTime() : TmfTimestamp.BIG_CRUNCH;
263 ITmfTimestamp endTime = fTimeRange != TmfTimeRange.NULL_RANGE ? fTimeRange.getEndTime() : TmfTimestamp.BIG_BANG;
264
265 for (final ITmfTrace<T> trace : fTraces) {
266 final ITmfTimestamp traceStartTime = trace.getStartTime();
267 if (traceStartTime.compareTo(startTime, true) < 0)
268 startTime = traceStartTime;
269 final ITmfTimestamp traceEndTime = trace.getEndTime();
270 if (traceEndTime.compareTo(endTime, true) > 0)
271 endTime = traceEndTime;
272 }
273 fTimeRange = new TmfTimeRange(startTime, endTime);
274 }
275
276 // ------------------------------------------------------------------------
277 // TmfProvider
278 // ------------------------------------------------------------------------
279
280 @Override
281 public ITmfContext armRequest(final ITmfDataRequest<T> request) {
282 // Tracer.trace("Ctx: Arming request - start");
283 ITmfTimestamp timestamp = (request instanceof ITmfEventRequest<?>) ? ((ITmfEventRequest<T>) request).getRange().getStartTime() : null;
284 if (TmfTimestamp.BIG_BANG.equals(timestamp) || request.getIndex() > 0)
285 timestamp = null; // use request index
286 TmfExperimentContext context = null;
287 if (timestamp != null) {
288 // seek by timestamp
289 context = seekEvent(timestamp);
290 ((ITmfEventRequest<T>) request).setStartIndex((int) context.getRank());
291 } else // Seek by rank
292 if ((fExperimentContext != null) && fExperimentContext.getRank() == request.getIndex())
293 // We are already at the right context -> no need to seek
294 context = fExperimentContext;
295 else
296 context = seekEvent(request.getIndex());
297 // Tracer.trace("Ctx: Arming request - done");
298 return context;
299 }
300
301 @Override
302 @SuppressWarnings("unchecked")
303 public T getNext(final ITmfContext context) {
304 if (context instanceof TmfExperimentContext)
305 return (T) getNextEvent(context);
306 return null;
307 }
308
309 // ------------------------------------------------------------------------
310 // ITmfTrace trace positioning
311 // ------------------------------------------------------------------------
312
313 // Returns a brand new context based on the location provided
314 // and initializes the event queues
315 @Override
316 public synchronized TmfExperimentContext seekLocation(final ITmfLocation<?> location) {
317 // Validate the location
318 if (location != null && !(location instanceof TmfExperimentLocation))
319 return null; // Throw an exception?
320
321 if (fTraces == null)
322 return null;
323
324 // Instantiate the location
325 final TmfExperimentLocation expLocation = (location == null) ? new TmfExperimentLocation(new TmfLocationArray(
326 new ITmfLocation<?>[fTraces.length]), new long[fTraces.length]) : (TmfExperimentLocation) location.clone();
327
328 // Create and populate the context's traces contexts
329 final TmfExperimentContext context = new TmfExperimentContext(fTraces, new ITmfContext[fTraces.length]);
330 // Tracer.trace("Ctx: SeekLocation - start");
331
332 long rank = 0;
333 for (int i = 0; i < fTraces.length; i++) {
334 // Get the relevant trace attributes
335 final ITmfLocation<?> traceLocation = expLocation.getLocation().locations[i];
336 final long traceRank = expLocation.getRanks()[i];
337
338 // Set the corresponding sub-context
339 context.getContexts()[i] = fTraces[i].seekLocation(traceLocation);
340 context.getContexts()[i].setRank(traceRank);
341 rank += traceRank;
342
343 // Set the trace location and read the corresponding event
344 /*
345 * The (TmfContext) cast should be safe since we created 'context'
346 * ourselves higher up.
347 */
348 expLocation.getLocation().locations[i] = context.getContexts()[i].getLocation().clone();
349 context.getEvents()[i] = fTraces[i].getNextEvent(context.getContexts()[i]);
350 }
351
352 // Tracer.trace("Ctx: SeekLocation - done");
353
354 // Finalize context
355 context.setLocation(expLocation);
356 context.setLastTrace(TmfExperimentContext.NO_TRACE);
357 context.setRank(rank);
358
359 fExperimentContext = context;
360
361 return context;
362 }
363
364 /* (non-Javadoc)
365 * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#seekEvent(org.eclipse.linuxtools .tmf.event.TmfTimestamp)
366 */
367 @Override
368 public synchronized TmfExperimentContext seekEvent(ITmfTimestamp timestamp) {
369
370 // Tracer.trace("Ctx: seekEvent(TS) - start");
371
372 if (timestamp == null)
373 timestamp = TmfTimestamp.BIG_BANG;
374
375 // First, find the right checkpoint
376 int index = Collections.binarySearch(fCheckpoints, new TmfCheckpoint(timestamp, null));
377
378 // In the very likely case that the checkpoint was not found, bsearch
379 // returns its negated would-be location (not an offset...). From that
380 // index, we can then position the stream and get the event.
381 if (index < 0)
382 index = Math.max(0, -(index + 2));
383
384 // Position the experiment at the checkpoint
385 ITmfLocation<?> location;
386 synchronized (fCheckpoints) {
387 if (fCheckpoints.size() > 0) {
388 if (index >= fCheckpoints.size())
389 index = fCheckpoints.size() - 1;
390 location = fCheckpoints.elementAt(index).getLocation();
391 } else
392 location = null;
393 }
394
395 final TmfExperimentContext context = seekLocation(location);
396 context.setRank((long) index * fIndexPageSize);
397
398 // And locate the event
399 ITmfEvent event = parseEvent(context);
400 while ((event != null) && (event.getTimestamp().compareTo(timestamp, false) < 0)) {
401 getNextEvent(context);
402 event = parseEvent(context);
403 }
404
405 if (event == null) {
406 context.setLocation(null);
407 context.setRank(ITmfContext.UNKNOWN_RANK);
408 }
409
410 return context;
411 }
412
413 /* (non-Javadoc)
414 * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#seekEvent(long)
415 */
416 @Override
417 public synchronized TmfExperimentContext seekEvent(final long rank) {
418
419 // Tracer.trace("Ctx: seekEvent(rank) - start");
420
421 // Position the stream at the previous checkpoint
422 int index = (int) rank / fIndexPageSize;
423 ITmfLocation<?> location;
424 synchronized (fCheckpoints) {
425 if (fCheckpoints.size() == 0)
426 location = null;
427 else {
428 if (index >= fCheckpoints.size())
429 index = fCheckpoints.size() - 1;
430 location = fCheckpoints.elementAt(index).getLocation();
431 }
432 }
433
434 final TmfExperimentContext context = seekLocation(location);
435 context.setRank((long) index * fIndexPageSize);
436
437 // And locate the event
438 ITmfEvent event = parseEvent(context);
439 long pos = context.getRank();
440 while ((event != null) && (pos++ < rank)) {
441 getNextEvent(context);
442 event = parseEvent(context);
443 }
444
445 if (event == null) {
446 context.setLocation(null);
447 context.setRank(ITmfContext.UNKNOWN_RANK);
448 }
449
450 return context;
451 }
452
453 @Override
454 public TmfContext seekLocation(final double ratio) {
455 final TmfContext context = seekEvent((long) (ratio * getNbEvents()));
456 return context;
457 }
458
459 @Override
460 public double getLocationRatio(final ITmfLocation<?> location) {
461 if (location instanceof TmfExperimentLocation)
462 return (double) seekLocation(location).getRank() / getNbEvents();
463 return 0;
464 }
465
466 @Override
467 public ITmfLocation<?> getCurrentLocation() {
468 if (fExperimentContext != null)
469 return fExperimentContext.getLocation();
470 return null;
471 }
472
473 // private void dumpContext(TmfExperimentContext context, boolean isBefore) {
474 // TmfContext context0 = context.getContexts()[0];
475 // TmfEvent event0 = context.getEvents()[0];
476 // TmfExperimentLocation location0 = (TmfExperimentLocation) context.getLocation();
477 // long rank0 = context.getRank();
478 // int trace = context.getLastTrace();
479 //
480 // StringBuffer result = new StringBuffer("Ctx: " + (isBefore ? "B " : "A "));
481 //
482 // result.append("[Ctx: fLoc= " + context0.getLocation().toString() + ", fRnk= " + context0.getRank() + "] ");
483 // result.append("[Evt: " + event0.getTimestamp().toString() + "] ");
484 // result.append("[Loc: fLoc= " + location0.getLocation()[0].toString() + ", fRnk= " + location0.getRanks()[0] + "] ");
485 // result.append("[Rnk: " + rank0 + "], [Trc: " + trace + "]");
486 // Tracer.trace(result.toString());
487 // }
488
489 /**
490 * Scan the next events from all traces and return the next one in
491 * chronological order.
492 *
493 * @param context the trace context
494 * @return the next event
495 */
496 @Override
497 public synchronized ITmfEvent getNextEvent(final ITmfContext context) {
498
499 // Validate the context
500 if (!(context instanceof TmfExperimentContext))
501 return null; // Throw an exception?
502
503 if (!context.equals(fExperimentContext))
504 // Tracer.trace("Ctx: Restoring context");
505 fExperimentContext = seekLocation(context.getLocation());
506
507 final TmfExperimentContext expContext = (TmfExperimentContext) context;
508
509 // dumpContext(expContext, true);
510
511 // If an event was consumed previously, get the next one from that trace
512 final int lastTrace = expContext.getLastTrace();
513 if (lastTrace != TmfExperimentContext.NO_TRACE) {
514 final ITmfContext traceContext = expContext.getContexts()[lastTrace];
515 expContext.getEvents()[lastTrace] = expContext.getTraces()[lastTrace].getNextEvent(traceContext);
516 expContext.setLastTrace(TmfExperimentContext.NO_TRACE);
517 }
518
519 // Scan the candidate events and identify the "next" trace to read from
520 final ITmfEvent eventArray[] = expContext.getEvents();
521 if (eventArray == null)
522 return null;
523 int trace = TmfExperimentContext.NO_TRACE;
524 ITmfTimestamp timestamp = TmfTimestamp.BIG_CRUNCH;
525 if (eventArray.length == 1) {
526 if (eventArray[0] != null) {
527 timestamp = eventArray[0].getTimestamp();
528 trace = 0;
529 }
530 } else
531 for (int i = 0; i < eventArray.length; i++) {
532 final ITmfEvent event = eventArray[i];
533 if (event != null && event.getTimestamp() != null) {
534 final ITmfTimestamp otherTS = event.getTimestamp();
535 if (otherTS.compareTo(timestamp, true) < 0) {
536 trace = i;
537 timestamp = otherTS;
538 }
539 }
540 }
541 // Update the experiment context and set the "next" event
542 ITmfEvent event = null;
543 if (trace != TmfExperimentContext.NO_TRACE) {
544 updateIndex(expContext, timestamp);
545
546 final ITmfContext traceContext = expContext.getContexts()[trace];
547 final TmfExperimentLocation expLocation = (TmfExperimentLocation) expContext.getLocation();
548 // expLocation.getLocation()[trace] = traceContext.getLocation().clone();
549 expLocation.getLocation().locations[trace] = traceContext.getLocation().clone();
550
551 // updateIndex(expContext, timestamp);
552
553 expLocation.getRanks()[trace] = traceContext.getRank();
554 expContext.setLastTrace(trace);
555 expContext.increaseRank();
556 event = expContext.getEvents()[trace];
557 fExperimentContext = expContext;
558 }
559
560 // if (event != null) {
561 // Tracer.trace("Exp: " + (expContext.getRank() - 1) + ": " + event.getTimestamp().toString());
562 // dumpContext(expContext, false);
563 // Tracer.trace("Ctx: Event returned= " + event.getTimestamp().toString());
564 // }
565
566 return event;
567 }
568
569 public synchronized void updateIndex(final ITmfContext context, final ITmfTimestamp timestamp) {
570 // Build the index as we go along
571 final long rank = context.getRank();
572 if (context.hasValidRank() && (rank % fIndexPageSize) == 0) {
573 // Determine the table position
574 final long position = rank / fIndexPageSize;
575 // Add new entry at proper location (if empty)
576 if (fCheckpoints.size() == position) {
577 final ITmfLocation<?> location = context.getLocation().clone();
578 fCheckpoints.add(new TmfCheckpoint(timestamp.clone(), location));
579 // System.out.println(this + "[" + (fCheckpoints.size() - 1) + "] " + timestamp + ", "
580 // + location.toString());
581 }
582 }
583 }
584
585 /* (non-Javadoc)
586 * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#parseEvent(org.eclipse.linuxtools .tmf.trace.TmfContext)
587 */
588 @Override
589 public ITmfEvent parseEvent(final ITmfContext context) {
590
591 // Validate the context
592 if (!(context instanceof TmfExperimentContext))
593 return null; // Throw an exception?
594
595 if (!context.equals(fExperimentContext))
596 // Tracer.trace("Ctx: Restoring context");
597 seekLocation(context.getLocation());
598
599 final TmfExperimentContext expContext = (TmfExperimentContext) context;
600
601 // If an event was consumed previously, get the next one from that trace
602 final int lastTrace = expContext.getLastTrace();
603 if (lastTrace != TmfExperimentContext.NO_TRACE) {
604 final ITmfContext traceContext = expContext.getContexts()[lastTrace];
605 expContext.getEvents()[lastTrace] = expContext.getTraces()[lastTrace].getNextEvent(traceContext);
606 expContext.setLastTrace(TmfExperimentContext.NO_TRACE);
607 fExperimentContext = (TmfExperimentContext) context;
608 }
609
610 // Scan the candidate events and identify the "next" trace to read from
611 int trace = TmfExperimentContext.NO_TRACE;
612 ITmfTimestamp timestamp = TmfTimestamp.BIG_CRUNCH;
613 for (int i = 0; i < expContext.getTraces().length; i++) {
614 final ITmfEvent event = expContext.getEvents()[i];
615 if (event != null && event.getTimestamp() != null) {
616 final ITmfTimestamp otherTS = event.getTimestamp();
617 if (otherTS.compareTo(timestamp, true) < 0) {
618 trace = i;
619 timestamp = otherTS;
620 }
621 }
622 }
623
624 ITmfEvent event = null;
625 if (trace != TmfExperimentContext.NO_TRACE)
626 event = expContext.getEvents()[trace];
627
628 return event;
629 }
630
631 /* (non-Javadoc)
632 * @see java.lang.Object#toString()
633 */
634 @Override
635 @SuppressWarnings("nls")
636 public String toString() {
637 return "[TmfExperiment (" + getName() + ")]";
638 }
639
640 // ------------------------------------------------------------------------
641 // Indexing
642 // ------------------------------------------------------------------------
643
644 private synchronized void initializeStreamingMonitor() {
645 if (fInitialized)
646 return;
647 fInitialized = true;
648
649 if (getStreamingInterval() == 0) {
650 final TmfContext context = seekLocation(null);
651 final ITmfEvent event = getNext(context);
652 if (event == null)
653 return;
654 final TmfTimeRange timeRange = new TmfTimeRange(event.getTimestamp().clone(), TmfTimestamp.BIG_CRUNCH);
655 final TmfExperimentRangeUpdatedSignal signal = new TmfExperimentRangeUpdatedSignal(this, this, timeRange);
656
657 // Broadcast in separate thread to prevent deadlock
658 new Thread() {
659 @Override
660 public void run() {
661 broadcast(signal);
662 }
663 }.start();
664 return;
665 }
666
667 final Thread thread = new Thread("Streaming Monitor for experiment " + getName()) { ////$NON-NLS-1$
668 private ITmfTimestamp safeTimestamp = null;
669 private TmfTimeRange timeRange = null;
670
671 @Override
672 public void run() {
673 while (!fExecutor.isShutdown()) {
674 if (!isIndexingBusy()) {
675 ITmfTimestamp startTimestamp = TmfTimestamp.BIG_CRUNCH;
676 ITmfTimestamp endTimestamp = TmfTimestamp.BIG_BANG;
677 for (final ITmfTrace<T> trace : fTraces) {
678 if (trace.getStartTime().compareTo(startTimestamp) < 0)
679 startTimestamp = trace.getStartTime();
680 if (trace.getStreamingInterval() != 0 && trace.getEndTime().compareTo(endTimestamp) > 0)
681 endTimestamp = trace.getEndTime();
682 }
683 if (safeTimestamp != null && safeTimestamp.compareTo(getTimeRange().getEndTime(), false) > 0)
684 timeRange = new TmfTimeRange(startTimestamp, safeTimestamp);
685 else
686 timeRange = null;
687 safeTimestamp = endTimestamp;
688 if (timeRange != null) {
689 final TmfExperimentRangeUpdatedSignal signal =
690 new TmfExperimentRangeUpdatedSignal(TmfExperiment.this, TmfExperiment.this, timeRange);
691 broadcast(signal);
692 }
693 }
694 try {
695 Thread.sleep(getStreamingInterval());
696 } catch (final InterruptedException e) {
697 e.printStackTrace();
698 }
699 }
700 }
701 };
702 thread.start();
703 }
704
705 /*
706 * (non-Javadoc)
707 *
708 * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#getStreamingInterval()
709 */
710 @Override
711 public long getStreamingInterval() {
712 long interval = 0;
713 for (final ITmfTrace<T> trace : fTraces)
714 interval = Math.max(interval, trace.getStreamingInterval());
715 return interval;
716 }
717
718 /*
719 * The experiment holds the globally ordered events of its set of traces. It
720 * is expected to provide access to each individual event by index i.e. it
721 * must be possible to request the Nth event of the experiment.
722 *
723 * The purpose of the index is to keep the information needed to rapidly
724 * restore the traces contexts at regular intervals (every INDEX_PAGE_SIZE
725 * event).
726 */
727
728 // The index page size
729 private static final int DEFAULT_INDEX_PAGE_SIZE = 5000;
730 protected int fIndexPageSize;
731 protected boolean fIndexing = false;
732 protected TmfTimeRange fIndexingPendingRange = TmfTimeRange.NULL_RANGE;
733
734 private Integer fEndSynchReference;
735
736 // private static BufferedWriter fEventLog = null;
737 // private static BufferedWriter openLogFile(String filename) {
738 // BufferedWriter outfile = null;
739 // try {
740 // outfile = new BufferedWriter(new FileWriter(filename));
741 // } catch (IOException e) {
742 // e.printStackTrace();
743 // }
744 // return outfile;
745 // }
746
747 protected boolean isIndexingBusy() {
748 synchronized (fCheckpoints) {
749 return fIndexing;
750 }
751 }
752
753 @SuppressWarnings("unchecked")
754 private void indexExperiment(final boolean waitForCompletion, final int index, final TmfTimeRange timeRange) {
755
756 synchronized (fCheckpoints) {
757 if (fIndexing)
758 return;
759 fIndexing = true;
760 }
761
762 final Job job = new Job("Indexing " + getName() + "...") { //$NON-NLS-1$ //$NON-NLS-2$
763
764 @Override
765 protected IStatus run(final IProgressMonitor monitor) {
766 while (!monitor.isCanceled())
767 try {
768 Thread.sleep(100);
769 } catch (final InterruptedException e) {
770 return Status.OK_STATUS;
771 }
772 monitor.done();
773 return Status.OK_STATUS;
774 }
775 };
776 job.schedule();
777
778 // fEventLog = openLogFile("TraceEvent.log");
779 // System.out.println(System.currentTimeMillis() + ": Experiment indexing started");
780
781 final ITmfEventRequest<ITmfEvent> request = new TmfEventRequest<ITmfEvent>(ITmfEvent.class, timeRange, index,
782 TmfDataRequest.ALL_DATA,
783 fIndexPageSize, ITmfDataRequest.ExecutionType.BACKGROUND) { // PATA
784 // FOREGROUND
785
786 // long indexingStart = System.nanoTime();
787
788 ITmfTimestamp startTime = (fTimeRange == TmfTimeRange.NULL_RANGE) ? null : fTimeRange.getStartTime();
789 ITmfTimestamp lastTime = (fTimeRange == TmfTimeRange.NULL_RANGE) ? null : fTimeRange.getEndTime();
790 long initialNbEvents = fNbEvents;
791
792 @Override
793 public void handleStarted() {
794 super.handleStarted();
795 }
796
797 @Override
798 public void handleData(final ITmfEvent event) {
799 super.handleData(event);
800 if (event != null) {
801 final ITmfTimestamp ts = event.getTimestamp();
802 if (startTime == null)
803 startTime = ts.clone();
804 lastTime = ts.clone();
805 }
806 if ((getNbRead() % fIndexPageSize) == 1 && getNbRead() != 1)
807 updateExperiment();
808 }
809
810 @Override
811 public void handleSuccess() {
812 // long indexingEnd = System.nanoTime();
813
814 // if the end time is a real value then it is the streaming safe
815 // time stamp
816 // set the last time to the safe time stamp to prevent
817 // unnecessary indexing requests
818 if (getRange().getEndTime() != TmfTimestamp.BIG_CRUNCH)
819 lastTime = getRange().getEndTime();
820 updateExperiment();
821 // System.out.println(System.currentTimeMillis() + ": Experiment indexing completed");
822
823 // long average = (indexingEnd - indexingStart) / fNbEvents;
824 // System.out.println(getName() + ": start=" + startTime + ", end=" + lastTime + ", elapsed="
825 // + (indexingEnd * 1.0 - indexingStart) / 1000000000);
826 // System.out.println(getName() + ": nbEvents=" + fNbEvents + " (" + (average / 1000) + "."
827 // + (average % 1000) + " us/evt)");
828 super.handleSuccess();
829 }
830
831 @Override
832 public void handleCompleted() {
833 job.cancel();
834 super.handleCompleted();
835 synchronized (fCheckpoints) {
836 fIndexing = false;
837 if (fIndexingPendingRange != TmfTimeRange.NULL_RANGE) {
838 indexExperiment(false, (int) fNbEvents, fIndexingPendingRange);
839 fIndexingPendingRange = TmfTimeRange.NULL_RANGE;
840 }
841 }
842 }
843
844 private void updateExperiment() {
845 final int nbRead = getNbRead();
846 if (startTime != null)
847 fTimeRange = new TmfTimeRange(startTime, lastTime.clone());
848 if (nbRead != 0) {
849 // updateTimeRange();
850 // updateNbEvents();
851 fNbEvents = initialNbEvents + nbRead;
852 notifyListeners();
853 }
854 }
855 };
856
857 sendRequest((ITmfDataRequest<T>) request);
858 if (waitForCompletion)
859 try {
860 request.waitForCompletion();
861 } catch (final InterruptedException e) {
862 e.printStackTrace();
863 }
864 }
865
866 protected void notifyListeners() {
867 broadcast(new TmfExperimentUpdatedSignal(this, this)); // , null));
868 // broadcast(new TmfExperimentRangeUpdatedSignal(this, this,
869 // fTimeRange)); // , null));
870 }
871
872 // ------------------------------------------------------------------------
873 // Signal handlers
874 // ------------------------------------------------------------------------
875
876 @TmfSignalHandler
877 public void experimentSelected(final TmfExperimentSelectedSignal<T> signal) {
878 final TmfExperiment<?> experiment = signal.getExperiment();
879 if (experiment == this) {
880 setCurrentExperiment(experiment);
881 fEndSynchReference = Integer.valueOf(signal.getReference());
882 }
883 }
884
885 @TmfSignalHandler
886 public void endSync(final TmfEndSynchSignal signal) {
887 if (fEndSynchReference != null && fEndSynchReference.intValue() == signal.getReference()) {
888 fEndSynchReference = null;
889 initializeStreamingMonitor();
890 }
891 }
892
893 @TmfSignalHandler
894 public void experimentUpdated(final TmfExperimentUpdatedSignal signal) {
895 }
896
897 @TmfSignalHandler
898 public void experimentRangeUpdated(final TmfExperimentRangeUpdatedSignal signal) {
899 if (signal.getExperiment() == this)
900 indexExperiment(false, (int) fNbEvents, signal.getRange());
901 }
902
903 @TmfSignalHandler
904 public void traceUpdated(final TmfTraceUpdatedSignal signal) {
905 for (final ITmfTrace<T> trace : fTraces)
906 if (trace == signal.getTrace()) {
907 synchronized (fCheckpoints) {
908 if (fIndexing) {
909 if (fIndexingPendingRange == TmfTimeRange.NULL_RANGE)
910 fIndexingPendingRange = signal.getRange();
911 else {
912 ITmfTimestamp startTime = fIndexingPendingRange.getStartTime();
913 ITmfTimestamp endTime = fIndexingPendingRange.getEndTime();
914 if (signal.getRange().getStartTime().compareTo(startTime) < 0)
915 startTime = signal.getRange().getStartTime();
916 if (signal.getRange().getEndTime().compareTo(endTime) > 0)
917 endTime = signal.getRange().getEndTime();
918 fIndexingPendingRange = new TmfTimeRange(startTime, endTime);
919 }
920 return;
921 }
922 }
923 indexExperiment(false, (int) fNbEvents, signal.getRange());
924 return;
925 }
926 }
927
928 @Override
929 public String getPath() {
930 // TODO Auto-generated method stub
931 return null;
932 }
933
934 /**
935 * Set the file to be used for bookmarks on this experiment
936 *
937 * @param file the bookmarks file
938 */
939 public void setBookmarksFile(final IFile file) {
940 fBookmarksFile = file;
941 }
942
943 /**
944 * Get the file used for bookmarks on this experiment
945 *
946 * @return the bookmarks file or null if none is set
947 */
948 public IFile getBookmarksFile() {
949 return fBookmarksFile;
950 }
951
952 /*
953 * (non-Javadoc)
954 *
955 * @see org.eclipse.linuxtools.tmf.core.trace.ITmfTrace#getResource()
956 */
957 @Override
958 public IResource getResource() {
959 return fResource;
960 }
961 }
This page took 0.079006 seconds and 5 git commands to generate.