Fix for streaming and reconnection. Added standalone releng for lttng.
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf.core / src / org / eclipse / linuxtools / tmf / core / experiment / TmfExperiment.java
CommitLineData
8c8bf09f
ASL
1/*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
12
6c13869b 13package org.eclipse.linuxtools.tmf.core.experiment;
8c8bf09f 14
12c155f5 15import java.io.FileNotFoundException;
9f584e4c 16import java.util.Collections;
8c8bf09f
ASL
17import java.util.Vector;
18
12c155f5 19import org.eclipse.core.resources.IProject;
05bd3318
FC
20import org.eclipse.core.runtime.IProgressMonitor;
21import org.eclipse.core.runtime.IStatus;
22import org.eclipse.core.runtime.Status;
23import org.eclipse.core.runtime.jobs.Job;
6c13869b
FC
24import org.eclipse.linuxtools.tmf.core.component.TmfEventProvider;
25import org.eclipse.linuxtools.tmf.core.event.TmfEvent;
26import org.eclipse.linuxtools.tmf.core.event.TmfTimeRange;
27import org.eclipse.linuxtools.tmf.core.event.TmfTimestamp;
28import org.eclipse.linuxtools.tmf.core.request.ITmfDataRequest;
29import org.eclipse.linuxtools.tmf.core.request.ITmfEventRequest;
30import org.eclipse.linuxtools.tmf.core.request.TmfDataRequest;
31import org.eclipse.linuxtools.tmf.core.request.TmfEventRequest;
1b70b6dc 32import org.eclipse.linuxtools.tmf.core.signal.TmfEndSynchSignal;
6c13869b
FC
33import org.eclipse.linuxtools.tmf.core.signal.TmfExperimentDisposedSignal;
34import org.eclipse.linuxtools.tmf.core.signal.TmfExperimentRangeUpdatedSignal;
35import org.eclipse.linuxtools.tmf.core.signal.TmfExperimentSelectedSignal;
36import org.eclipse.linuxtools.tmf.core.signal.TmfExperimentUpdatedSignal;
37import org.eclipse.linuxtools.tmf.core.signal.TmfSignalHandler;
38import org.eclipse.linuxtools.tmf.core.signal.TmfSignalManager;
39import org.eclipse.linuxtools.tmf.core.signal.TmfTraceUpdatedSignal;
40import org.eclipse.linuxtools.tmf.core.trace.ITmfContext;
41import org.eclipse.linuxtools.tmf.core.trace.ITmfLocation;
42import org.eclipse.linuxtools.tmf.core.trace.ITmfTrace;
43import org.eclipse.linuxtools.tmf.core.trace.TmfCheckpoint;
44import org.eclipse.linuxtools.tmf.core.trace.TmfContext;
8c8bf09f
ASL
45
46/**
47 * <b><u>TmfExperiment</u></b>
48 * <p>
12c155f5 49 * TmfExperiment presents a time-ordered, unified view of a set of TmfTraces that are part of a tracing experiment.
8c8bf09f
ASL
50 * <p>
51 */
12c155f5 52public class TmfExperiment<T extends TmfEvent> extends TmfEventProvider<T> implements ITmfTrace<T> {
8c8bf09f
ASL
53
54 // ------------------------------------------------------------------------
55 // Attributes
56 // ------------------------------------------------------------------------
57
a79913eb 58 // The currently selected experiment
82e04272 59 protected static TmfExperiment<?> fCurrentExperiment = null;
e31e01e8 60
a79913eb 61 // The set of traces that constitute the experiment
12c155f5 62 protected ITmfTrace<T>[] fTraces;
8c8bf09f
ASL
63
64 // The total number of events
82e04272 65 protected long fNbEvents;
8c8bf09f
ASL
66
67 // The experiment time range
82e04272 68 protected TmfTimeRange fTimeRange;
8c8bf09f 69
9b635e61 70 // The experiment reference timestamp (default: Zero)
82e04272 71 protected TmfTimestamp fEpoch;
8c8bf09f 72
a79913eb 73 // The experiment index
82e04272 74 protected Vector<TmfCheckpoint> fCheckpoints = new Vector<TmfCheckpoint>();
9f584e4c 75
f6b14ce2
FC
76 // The current experiment context
77 protected TmfExperimentContext fExperimentContext;
a79913eb 78
8c8bf09f
ASL
79 // ------------------------------------------------------------------------
80 // Constructors
81 // ------------------------------------------------------------------------
82
12c155f5
FC
83 @Override
84 public boolean validate(IProject project, String path) {
85 return true;
86 }
87
88 @Override
89 public void initTrace(String path, Class<T> eventType) throws FileNotFoundException {
90 }
91
92 @Override
93 public void initTrace(String path, Class<T> eventType, boolean indexTrace) throws FileNotFoundException {
94 }
95
96 @Override
97 public void initTrace(String path, Class<T> eventType, int cacheSize) throws FileNotFoundException {
98 }
99
100 @Override
101 public void initTrace(String path, Class<T> eventType, int cacheSize, boolean indexTrace) throws FileNotFoundException {
102 }
103
96c6806f
PT
104 @Override
105 public void initTrace(String path, Class<T> eventType, int cacheSize, boolean indexTrace, String name) throws FileNotFoundException {
106 }
107
8c8bf09f
ASL
108 /**
109 * @param type
110 * @param id
111 * @param traces
112 * @param epoch
113 * @param indexPageSize
114 */
12c155f5 115 public TmfExperiment(Class<T> type, String id, ITmfTrace<T>[] traces, TmfTimestamp epoch, int indexPageSize) {
045df77d 116 this(type, id, traces, TmfTimestamp.Zero, indexPageSize, false);
a79913eb 117 }
cb866e08 118
12c155f5 119 public TmfExperiment(Class<T> type, String id, ITmfTrace<T>[] traces, TmfTimestamp epoch, int indexPageSize, boolean preIndexExperiment) {
a79913eb 120 super(id, type);
8c8bf09f 121
a79913eb
FC
122 fTraces = traces;
123 fEpoch = epoch;
124 fIndexPageSize = indexPageSize;
125 fTimeRange = TmfTimeRange.Null;
8c8bf09f 126
a79913eb
FC
127 if (preIndexExperiment) {
128 indexExperiment(true);
129 updateTimeRange();
130 }
cb866e08 131
a79913eb 132 }
8c8bf09f 133
82e04272
FC
134 protected TmfExperiment(String id, Class<T> type) {
135 super(id, type);
a79913eb 136 }
82e04272 137
8c8bf09f
ASL
138 /**
139 * @param type
140 * @param id
141 * @param traces
142 */
12c155f5 143 public TmfExperiment(Class<T> type, String id, ITmfTrace<T>[] traces) {
8c8bf09f
ASL
144 this(type, id, traces, TmfTimestamp.Zero, DEFAULT_INDEX_PAGE_SIZE);
145 }
146
147 /**
148 * @param type
149 * @param id
150 * @param traces
151 * @param indexPageSize
152 */
12c155f5 153 public TmfExperiment(Class<T> type, String id, ITmfTrace<T>[] traces, int indexPageSize) {
8c8bf09f
ASL
154 this(type, id, traces, TmfTimestamp.Zero, indexPageSize);
155 }
a79913eb 156
f6b14ce2
FC
157 /**
158 * Copy constructor
a79913eb 159 *
f6b14ce2
FC
160 * @param other
161 */
12c155f5 162 @SuppressWarnings("unchecked")
ce785d7d 163 public TmfExperiment(TmfExperiment<T> other) {
a79913eb
FC
164 super(other.getName() + "(clone)", other.fType); //$NON-NLS-1$
165
166 fEpoch = other.fEpoch;
167 fIndexPageSize = other.fIndexPageSize;
168
169 fTraces = new ITmfTrace[other.fTraces.length];
170 for (int trace = 0; trace < other.fTraces.length; trace++) {
12c155f5 171 fTraces[trace] = other.fTraces[trace].copy();
a79913eb
FC
172 }
173
174 fNbEvents = other.fNbEvents;
175 fTimeRange = other.fTimeRange;
176 }
177
178 @Override
12c155f5 179 public TmfExperiment<T> copy() {
a79913eb
FC
180 TmfExperiment<T> experiment = new TmfExperiment<T>(this);
181 TmfSignalManager.deregister(experiment);
182 return experiment;
183 }
184
8c8bf09f 185 /**
ff4ed569 186 * Clears the experiment
8c8bf09f
ASL
187 */
188 @Override
12c155f5 189 @SuppressWarnings("rawtypes")
a79913eb
FC
190 public synchronized void dispose() {
191
192 TmfExperimentDisposedSignal<T> signal = new TmfExperimentDisposedSignal<T>(this, this);
193 broadcast(signal);
194
195 if (fTraces != null) {
196 for (ITmfTrace trace : fTraces) {
197 trace.dispose();
198 }
199 fTraces = null;
200 }
201 if (fCheckpoints != null) {
202 fCheckpoints.clear();
203 }
2fb2eb37 204 super.dispose();
8c8bf09f
ASL
205 }
206
9f584e4c 207 // ------------------------------------------------------------------------
cbd4ad82 208 // ITmfTrace
9f584e4c
FC
209 // ------------------------------------------------------------------------
210
a79913eb
FC
211 @Override
212 public long getNbEvents() {
213 return fNbEvents;
214 }
9f584e4c 215
d4011df2 216 @Override
a79913eb 217 public int getCacheSize() {
54d55ced
FC
218 return fIndexPageSize;
219 }
220
a79913eb
FC
221 @Override
222 public TmfTimeRange getTimeRange() {
223 return fTimeRange;
224 }
9f584e4c 225
a79913eb
FC
226 @Override
227 public TmfTimestamp getStartTime() {
228 return fTimeRange.getStartTime();
229 }
9f584e4c 230
a79913eb
FC
231 @Override
232 public TmfTimestamp getEndTime() {
233 return fTimeRange.getEndTime();
234 }
9f584e4c 235
54d55ced 236 public Vector<TmfCheckpoint> getCheckpoints() {
a79913eb 237 return fCheckpoints;
54d55ced
FC
238 }
239
8c8bf09f 240 // ------------------------------------------------------------------------
e31e01e8 241 // Accessors
8c8bf09f
ASL
242 // ------------------------------------------------------------------------
243
c1c69938 244 public static void setCurrentExperiment(TmfExperiment<?> experiment) {
a79913eb 245 fCurrentExperiment = experiment;
f6b14ce2
FC
246 }
247
e31e01e8 248 public static TmfExperiment<?> getCurrentExperiment() {
a79913eb 249 return fCurrentExperiment;
8c8bf09f
ASL
250 }
251
8c8bf09f 252 public TmfTimestamp getEpoch() {
a79913eb 253 return fEpoch;
8c8bf09f
ASL
254 }
255
12c155f5 256 public ITmfTrace<T>[] getTraces() {
a79913eb 257 return fTraces;
8c8bf09f
ASL
258 }
259
260 /**
12c155f5
FC
261 * Returns the rank of the first event with the requested timestamp. If none, returns the index of the next event
262 * (if any).
a79913eb 263 *
85fb0e54 264 * @param timestamp
8c8bf09f
ASL
265 * @return
266 */
d4011df2 267 @Override
a79913eb
FC
268 public long getRank(TmfTimestamp timestamp) {
269 TmfExperimentContext context = seekEvent(timestamp);
270 return context.getRank();
8c8bf09f
ASL
271 }
272
273 /**
12c155f5 274 * Returns the timestamp of the event at the requested index. If none, returns null.
a79913eb 275 *
8c8bf09f
ASL
276 * @param index
277 * @return
278 */
279 public TmfTimestamp getTimestamp(int index) {
a79913eb
FC
280 TmfExperimentContext context = seekEvent(index);
281 TmfEvent event = getNextEvent(context);
282 return (event != null) ? event.getTimestamp() : null;
8c8bf09f
ASL
283 }
284
285 // ------------------------------------------------------------------------
286 // Operators
287 // ------------------------------------------------------------------------
288
8c8bf09f
ASL
289 /**
290 * Update the global time range
291 */
a79913eb
FC
292 protected void updateTimeRange() {
293 TmfTimestamp startTime = fTimeRange != TmfTimeRange.Null ? fTimeRange.getStartTime() : TmfTimestamp.BigCrunch;
294 TmfTimestamp endTime = fTimeRange != TmfTimeRange.Null ? fTimeRange.getEndTime() : TmfTimestamp.BigBang;
295
12c155f5 296 for (ITmfTrace<T> trace : fTraces) {
a79913eb
FC
297 TmfTimestamp traceStartTime = trace.getStartTime();
298 if (traceStartTime.compareTo(startTime, true) < 0)
299 startTime = traceStartTime;
300 TmfTimestamp traceEndTime = trace.getEndTime();
301 if (traceEndTime.compareTo(endTime, true) > 0)
302 endTime = traceEndTime;
303 }
304 fTimeRange = new TmfTimeRange(startTime, endTime);
8c8bf09f
ASL
305 }
306
307 // ------------------------------------------------------------------------
308 // TmfProvider
309 // ------------------------------------------------------------------------
a79913eb
FC
310 @Override
311 public ITmfContext armRequest(ITmfDataRequest<T> request) {
9b635e61 312// Tracer.trace("Ctx: Arming request - start");
12c155f5
FC
313 TmfTimestamp timestamp = (request instanceof ITmfEventRequest<?>) ? ((ITmfEventRequest<T>) request).getRange().getStartTime()
314 : null;
a79913eb
FC
315
316 if (TmfTimestamp.BigBang.equals(timestamp) || request.getIndex() > 0) {
317 timestamp = null; // use request index
318 }
319
320 TmfExperimentContext context = null;
321 if (timestamp != null) {
322 // seek by timestamp
323 context = seekEvent(timestamp);
324 ((ITmfEventRequest<T>) request).setStartIndex((int) context.getRank());
325 } else {
326 // Seek by rank
327 if ((fExperimentContext != null) && fExperimentContext.getRank() == request.getIndex()) {
328 // We are already at the right context -> no need to seek
329 context = fExperimentContext;
330 } else {
331 context = seekEvent(request.getIndex());
332 }
333 }
9b635e61 334// Tracer.trace("Ctx: Arming request - done");
a79913eb
FC
335 return context;
336 }
337
338 @SuppressWarnings("unchecked")
339 @Override
340 public T getNext(ITmfContext context) {
341 if (context instanceof TmfExperimentContext) {
342 return (T) getNextEvent((TmfExperimentContext) context);
343 }
344 return null;
345 }
346
347 // ------------------------------------------------------------------------
9f584e4c
FC
348 // ITmfTrace trace positioning
349 // ------------------------------------------------------------------------
350
a79913eb
FC
351 // Returns a brand new context based on the location provided
352 // and initializes the event queues
353 @Override
354 public synchronized TmfExperimentContext seekLocation(ITmfLocation<?> location) {
355 // Validate the location
356 if (location != null && !(location instanceof TmfExperimentLocation)) {
357 return null; // Throw an exception?
358 }
8f50c396 359
a79913eb
FC
360 if (fTraces == null) { // experiment has been disposed
361 return null;
362 }
8f50c396 363
a79913eb 364 // Instantiate the location
12c155f5
FC
365 TmfExperimentLocation expLocation = (location == null) ? new TmfExperimentLocation(new TmfLocationArray(
366 new ITmfLocation<?>[fTraces.length]), new long[fTraces.length]) : (TmfExperimentLocation) location.clone();
8f50c396 367
a79913eb
FC
368 // Create and populate the context's traces contexts
369 TmfExperimentContext context = new TmfExperimentContext(fTraces, new TmfContext[fTraces.length]);
9b635e61
FC
370// Tracer.trace("Ctx: SeekLocation - start");
371
a79913eb
FC
372 long rank = 0;
373 for (int i = 0; i < fTraces.length; i++) {
374 // Get the relevant trace attributes
375 ITmfLocation<?> traceLocation = expLocation.getLocation().locations[i];
376 long traceRank = expLocation.getRanks()[i];
8f50c396 377
a79913eb
FC
378 // Set the corresponding sub-context
379 context.getContexts()[i] = fTraces[i].seekLocation(traceLocation);
380 context.getContexts()[i].setRank(traceRank);
381 rank += traceRank;
8f50c396 382
a79913eb 383 // Set the trace location and read the corresponding event
478e04a4 384 expLocation.getLocation().locations[i] = context.getContexts()[i].getLocation().clone();
a79913eb
FC
385 context.getEvents()[i] = fTraces[i].getNextEvent(context.getContexts()[i]);
386 }
8f50c396 387
9b635e61
FC
388// Tracer.trace("Ctx: SeekLocation - done");
389
a79913eb
FC
390 // Finalize context
391 context.setLocation(expLocation);
392 context.setLastTrace(TmfExperimentContext.NO_TRACE);
393 context.setRank(rank);
9b635e61 394
a79913eb 395 fExperimentContext = context;
9b635e61 396
a79913eb
FC
397 return context;
398 }
9f584e4c 399
a79913eb
FC
400 /*
401 * (non-Javadoc)
402 *
12c155f5 403 * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#seekEvent(org.eclipse.linuxtools .tmf.event.TmfTimestamp)
a79913eb
FC
404 */
405 @Override
406 public synchronized TmfExperimentContext seekEvent(TmfTimestamp timestamp) {
9b635e61
FC
407
408// Tracer.trace("Ctx: seekEvent(TS) - start");
8c8bf09f 409
a79913eb
FC
410 if (timestamp == null) {
411 timestamp = TmfTimestamp.BigBang;
412 }
9f584e4c 413
a79913eb
FC
414 // First, find the right checkpoint
415 int index = Collections.binarySearch(fCheckpoints, new TmfCheckpoint(timestamp, null));
9f584e4c
FC
416
417 // In the very likely case that the checkpoint was not found, bsearch
418 // returns its negated would-be location (not an offset...). From that
419 // index, we can then position the stream and get the event.
420 if (index < 0) {
421 index = Math.max(0, -(index + 2));
422 }
423
424 // Position the experiment at the checkpoint
452ad365 425 ITmfLocation<?> location;
9f584e4c 426 synchronized (fCheckpoints) {
a79913eb
FC
427 if (fCheckpoints.size() > 0) {
428 if (index >= fCheckpoints.size()) {
429 index = fCheckpoints.size() - 1;
430 }
431 location = fCheckpoints.elementAt(index).getLocation();
432 } else {
433 location = null;
434 }
9f584e4c
FC
435 }
436
85fb0e54 437 TmfExperimentContext context = seekLocation(location);
cbd4ad82 438 context.setRank((long) index * fIndexPageSize);
9f584e4c 439
a79913eb 440 // And locate the event
fa867360 441 TmfEvent event = parseEvent(context);
9f584e4c 442 while (event != null && event.getTimestamp().compareTo(timestamp, false) < 0) {
a79913eb
FC
443 getNextEvent(context);
444 event = parseEvent(context);
9f584e4c
FC
445 }
446
f6b14ce2 447 if (event == null) {
a79913eb
FC
448 context.setLocation(null);
449 context.setRank(ITmfContext.UNKNOWN_RANK);
9b635e61 450 }
f6b14ce2
FC
451
452 return context;
a79913eb 453 }
8c8bf09f 454
a79913eb
FC
455 /*
456 * (non-Javadoc)
457 *
458 * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#seekEvent(long)
459 */
460 @Override
461 public synchronized TmfExperimentContext seekEvent(long rank) {
9b635e61
FC
462
463// Tracer.trace("Ctx: seekEvent(rank) - start");
9f584e4c 464
54d55ced
FC
465 // Position the stream at the previous checkpoint
466 int index = (int) rank / fIndexPageSize;
467 ITmfLocation<?> location;
468 synchronized (fCheckpoints) {
a79913eb
FC
469 if (fCheckpoints.size() == 0) {
470 location = null;
471 } else {
472 if (index >= fCheckpoints.size()) {
473 index = fCheckpoints.size() - 1;
474 }
475 location = fCheckpoints.elementAt(index).getLocation();
476 }
54d55ced 477 }
e31e01e8 478
54d55ced 479 TmfExperimentContext context = seekLocation(location);
9b635e61 480 context.setRank((long) index * fIndexPageSize);
54d55ced 481
a79913eb 482 // And locate the event
fa867360 483 TmfEvent event = parseEvent(context);
9b635e61 484 long pos = context.getRank();
fa867360 485 while (event != null && pos++ < rank) {
a79913eb
FC
486 getNextEvent(context);
487 event = parseEvent(context);
54d55ced 488 }
9f584e4c 489
f6b14ce2 490 if (event == null) {
a79913eb
FC
491 context.setLocation(null);
492 context.setRank(ITmfContext.UNKNOWN_RANK);
9b635e61 493 }
f6b14ce2 494
a79913eb
FC
495 return context;
496 }
8c8bf09f 497
c76c54bb
FC
498 @Override
499 public TmfContext seekLocation(double ratio) {
500 TmfContext context = seekEvent((long) (ratio * getNbEvents()));
501 return context;
502 }
503
a79913eb 504 @Override
c76c54bb 505 public double getLocationRatio(ITmfLocation<?> location) {
a79913eb
FC
506 if (location instanceof TmfExperimentLocation) {
507 return (double) seekLocation(location).getRank() / getNbEvents();
508 }
c76c54bb
FC
509 return 0;
510 }
511
a79913eb
FC
512 @Override
513 public ITmfLocation<?> getCurrentLocation() {
514 if (fExperimentContext != null) {
515 return fExperimentContext.getLocation();
516 }
517 return null;
518 }
c76c54bb 519
a79913eb 520 /**
12c155f5 521 * Scan the next events from all traces and return the next one in chronological order.
a79913eb
FC
522 *
523 * @param context
524 * @return
525 */
9b635e61
FC
526
527// private void dumpContext(TmfExperimentContext context, boolean isBefore) {
528
529// TmfContext context0 = context.getContexts()[0];
530// TmfEvent event0 = context.getEvents()[0];
531// TmfExperimentLocation location0 = (TmfExperimentLocation) context.getLocation();
532// long rank0 = context.getRank();
533// int trace = context.getLastTrace();
534//
535// StringBuffer result = new StringBuffer("Ctx: " + (isBefore ? "B " : "A "));
536//
537// result.append("[Ctx: fLoc= " + context0.getLocation().toString() + ", fRnk= " + context0.getRank() + "] ");
538// result.append("[Evt: " + event0.getTimestamp().toString() + "] ");
539// result.append("[Loc: fLoc= " + location0.getLocation()[0].toString() + ", fRnk= " + location0.getRanks()[0] + "] ");
540// result.append("[Rnk: " + rank0 + "], [Trc: " + trace + "]");
541// Tracer.trace(result.toString());
542// }
54d55ced 543
a79913eb
FC
544 @Override
545 public synchronized TmfEvent getNextEvent(TmfContext context) {
546
547 // Validate the context
548 if (!(context instanceof TmfExperimentContext)) {
549 return null; // Throw an exception?
550 }
54d55ced 551
a79913eb 552 if (!context.equals(fExperimentContext)) {
9b635e61 553// Tracer.trace("Ctx: Restoring context");
a79913eb
FC
554 fExperimentContext = seekLocation(context.getLocation());
555 }
556
557 TmfExperimentContext expContext = (TmfExperimentContext) context;
8f50c396 558
9b635e61
FC
559// dumpContext(expContext, true);
560
a79913eb
FC
561 // If an event was consumed previously, get the next one from that trace
562 int lastTrace = expContext.getLastTrace();
563 if (lastTrace != TmfExperimentContext.NO_TRACE) {
564 TmfContext traceContext = expContext.getContexts()[lastTrace];
565 expContext.getEvents()[lastTrace] = expContext.getTraces()[lastTrace].getNextEvent(traceContext);
566 expContext.setLastTrace(TmfExperimentContext.NO_TRACE);
567 }
8f50c396 568
a79913eb
FC
569 // Scan the candidate events and identify the "next" trace to read from
570 TmfEvent eventArray[] = expContext.getEvents();
1324801a
FC
571 if (eventArray == null) {
572 return null;
573 }
a79913eb
FC
574 int trace = TmfExperimentContext.NO_TRACE;
575 TmfTimestamp timestamp = TmfTimestamp.BigCrunch;
1324801a
FC
576 if (eventArray.length == 1) {
577 if (eventArray[0] != null) {
578 timestamp = eventArray[0].getTimestamp();
579 trace = 0;
580 }
581 } else {
582 for (int i = 0; i < eventArray.length; i++) {
583 TmfEvent event = eventArray[i];
584 if (event != null && event.getTimestamp() != null) {
585 TmfTimestamp otherTS = event.getTimestamp();
586 if (otherTS.compareTo(timestamp, true) < 0) {
587 trace = i;
588 timestamp = otherTS;
589 }
590 }
591 }
a79913eb 592 }
1324801a 593 // Update the experiment context and set the "next" event
a79913eb
FC
594 TmfEvent event = null;
595 if (trace != TmfExperimentContext.NO_TRACE) {
596 updateIndex(expContext, timestamp);
82e04272 597
a79913eb
FC
598 TmfContext traceContext = expContext.getContexts()[trace];
599 TmfExperimentLocation expLocation = (TmfExperimentLocation) expContext.getLocation();
64fe8e8a 600// expLocation.getLocation()[trace] = traceContext.getLocation().clone();
478e04a4 601 expLocation.getLocation().locations[trace] = traceContext.getLocation().clone();
82e04272
FC
602
603// updateIndex(expContext, timestamp);
604
a79913eb
FC
605 expLocation.getRanks()[trace] = traceContext.getRank();
606 expContext.setLastTrace(trace);
607 expContext.updateRank(1);
608 event = expContext.getEvents()[trace];
609 fExperimentContext = expContext;
610 }
8f50c396 611
9b635e61
FC
612// if (event != null) {
613// Tracer.trace("Exp: " + (expContext.getRank() - 1) + ": " + event.getTimestamp().toString());
614// dumpContext(expContext, false);
615// Tracer.trace("Ctx: Event returned= " + event.getTimestamp().toString());
616// }
617
a79913eb
FC
618 return event;
619 }
620
621 public synchronized void updateIndex(ITmfContext context, TmfTimestamp timestamp) {
622 // Build the index as we go along
623 long rank = context.getRank();
624 if (context.isValidRank() && (rank % fIndexPageSize) == 0) {
625 // Determine the table position
626 long position = rank / fIndexPageSize;
627 // Add new entry at proper location (if empty)
628 if (fCheckpoints.size() == position) {
629 ITmfLocation<?> location = context.getLocation().clone();
630 fCheckpoints.add(new TmfCheckpoint(timestamp.clone(), location));
12c155f5
FC
631// System.out.println(this + "[" + (fCheckpoints.size() - 1) + "] " + timestamp + ", "
632// + location.toString());
a79913eb
FC
633 }
634 }
635 }
636
637 /*
638 * (non-Javadoc)
639 *
12c155f5 640 * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#parseEvent(org.eclipse.linuxtools .tmf.trace.TmfContext)
a79913eb
FC
641 */
642 @Override
643 public TmfEvent parseEvent(TmfContext context) {
644
645 // Validate the context
646 if (!(context instanceof TmfExperimentContext)) {
647 return null; // Throw an exception?
648 }
649
650 if (!context.equals(fExperimentContext)) {
9b635e61 651// Tracer.trace("Ctx: Restoring context");
a79913eb
FC
652 seekLocation(context.getLocation());
653 }
654
655 TmfExperimentContext expContext = (TmfExperimentContext) context;
656
657 // If an event was consumed previously, get the next one from that trace
658 int lastTrace = expContext.getLastTrace();
659 if (lastTrace != TmfExperimentContext.NO_TRACE) {
660 TmfContext traceContext = expContext.getContexts()[lastTrace];
661 expContext.getEvents()[lastTrace] = expContext.getTraces()[lastTrace].getNextEvent(traceContext);
662 expContext.setLastTrace(TmfExperimentContext.NO_TRACE);
663 fExperimentContext = (TmfExperimentContext) context;
664 }
665
666 // Scan the candidate events and identify the "next" trace to read from
667 int trace = TmfExperimentContext.NO_TRACE;
668 TmfTimestamp timestamp = TmfTimestamp.BigCrunch;
669 for (int i = 0; i < expContext.getTraces().length; i++) {
670 TmfEvent event = expContext.getEvents()[i];
671 if (event != null && event.getTimestamp() != null) {
672 TmfTimestamp otherTS = event.getTimestamp();
673 if (otherTS.compareTo(timestamp, true) < 0) {
674 trace = i;
675 timestamp = otherTS;
676 }
677 }
678 }
679
680 TmfEvent event = null;
681 if (trace != TmfExperimentContext.NO_TRACE) {
682 event = expContext.getEvents()[trace];
683 }
684
685 return event;
686 }
687
688 /*
689 * (non-Javadoc)
690 *
691 * @see java.lang.Object#toString()
692 */
693 @Override
3b38ea61 694 @SuppressWarnings("nls")
a79913eb
FC
695 public String toString() {
696 return "[TmfExperiment (" + getName() + ")]";
697 }
8c8bf09f
ASL
698
699 // ------------------------------------------------------------------------
700 // Indexing
701 // ------------------------------------------------------------------------
702
1b70b6dc
PT
703 private synchronized void initializeStreamingMonitor() {
704 if (getStreamingInterval() == 0) {
705 TmfContext context = seekLocation(null);
706 TmfEvent event = getNext(context);
707 if (event == null) {
708 return;
709 }
710 TmfTimeRange timeRange = new TmfTimeRange(event.getTimestamp(), TmfTimestamp.BigCrunch);
711 TmfExperimentRangeUpdatedSignal signal = new TmfExperimentRangeUpdatedSignal(this, this, timeRange);
712 broadcast(signal);
713 return;
714 }
715
716 final Thread thread = new Thread("Streaming Monitor for experiment " + getName()) { //$NON-NLS-1$
717 TmfTimestamp safeTimestamp = null;
718 TmfTimeRange timeRange = null;
719
720 @Override
721 public void run() {
722 while (!fExecutor.isShutdown()) {
723 if (!isIndexingBusy()) {
724 TmfTimestamp startTimestamp = TmfTimestamp.BigCrunch;
725 TmfTimestamp endTimestamp = TmfTimestamp.BigBang;
726 for (ITmfTrace<T> trace : fTraces) {
727 if (trace.getStartTime().compareTo(startTimestamp) < 0) {
728 startTimestamp = trace.getStartTime();
729 }
730 if (trace.getStreamingInterval() != 0 && trace.getEndTime().compareTo(endTimestamp) > 0) {
731 endTimestamp = trace.getEndTime();
732 }
733 }
734 if (safeTimestamp != null && safeTimestamp.compareTo(getTimeRange().getEndTime(), false) > 0) {
735 timeRange = new TmfTimeRange(startTimestamp, safeTimestamp);
736 } else {
737 timeRange = null;
738 }
739 safeTimestamp = endTimestamp;
740 if (timeRange != null) {
741 TmfExperimentRangeUpdatedSignal signal =
742 new TmfExperimentRangeUpdatedSignal(TmfExperiment.this, TmfExperiment.this, timeRange);
743 broadcast(signal);
744 }
745 }
746 try {
747 Thread.sleep(getStreamingInterval());
748 } catch (InterruptedException e) {
749 e.printStackTrace();
750 }
751 }
752 }
753 };
754 thread.start();
755 }
756
757 /* (non-Javadoc)
758 * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#getStreamingInterval()
759 */
760 @Override
761 public long getStreamingInterval() {
762 long interval = 0;
763 for (ITmfTrace<T> trace : fTraces) {
764 interval = Math.max(interval, trace.getStreamingInterval());
765 }
766 return interval;
767 }
768
a79913eb 769 /*
12c155f5
FC
770 * The experiment holds the globally ordered events of its set of traces. It is expected to provide access to each
771 * individual event by index i.e. it must be possible to request the Nth event of the experiment.
a79913eb 772 *
12c155f5
FC
773 * The purpose of the index is to keep the information needed to rapidly restore the traces contexts at regular
774 * intervals (every INDEX_PAGE_SIZE event).
a79913eb 775 */
8c8bf09f 776
a79913eb
FC
777 // The index page size
778 private static final int DEFAULT_INDEX_PAGE_SIZE = 5000;
779 protected int fIndexPageSize;
780 protected boolean fIndexing = false;
781 protected TmfTimeRange fIndexingPendingRange = TmfTimeRange.Null;
e31e01e8 782
1b70b6dc
PT
783 private Integer fEndSynchReference;
784
9b635e61
FC
785// private static BufferedWriter fEventLog = null;
786// private static BufferedWriter openLogFile(String filename) {
787// BufferedWriter outfile = null;
788// try {
789// outfile = new BufferedWriter(new FileWriter(filename));
790// } catch (IOException e) {
791// e.printStackTrace();
792// }
793// return outfile;
794// }
795
a79913eb
FC
796 protected boolean isIndexingBusy() {
797 synchronized (fCheckpoints) {
798 return fIndexing;
799 }
800 }
801
802 protected void indexExperiment(boolean waitForCompletion) {
803 indexExperiment(waitForCompletion, 0, TmfTimeRange.Eternity);
804 }
805
806 @SuppressWarnings("unchecked")
807 protected void indexExperiment(boolean waitForCompletion, final int index, final TmfTimeRange timeRange) {
808
809 synchronized (fCheckpoints) {
810 if (fIndexing) {
811 return;
812 }
813 fIndexing = true;
814 }
815
8a0edc79
FC
816 final Job job = new Job("Indexing " + getName() + "...") { //$NON-NLS-1$ //$NON-NLS-2$
817 @Override
818 protected IStatus run(IProgressMonitor monitor) {
819 while (!monitor.isCanceled()) {
820 try {
821 Thread.sleep(100);
822 } catch (InterruptedException e) {
823 return Status.OK_STATUS;
824 }
825 }
826 monitor.done();
827 return Status.OK_STATUS;
828 }
829 };
830 job.schedule();
831
9b635e61 832// fEventLog = openLogFile("TraceEvent.log");
12c155f5 833// System.out.println(System.currentTimeMillis() + ": Experiment indexing started");
550d787e 834
12c155f5
FC
835 ITmfEventRequest<TmfEvent> request = new TmfEventRequest<TmfEvent>(TmfEvent.class, timeRange, index, TmfDataRequest.ALL_DATA,
836 fIndexPageSize, ITmfDataRequest.ExecutionType.BACKGROUND) { // PATA FOREGROUND
550d787e 837
12c155f5 838// long indexingStart = System.nanoTime();
a79913eb
FC
839
840 TmfTimestamp startTime = (fTimeRange == TmfTimeRange.Null) ? null : fTimeRange.getStartTime();
841 TmfTimestamp lastTime = (fTimeRange == TmfTimeRange.Null) ? null : fTimeRange.getEndTime();
842 long initialNbEvents = fNbEvents;
843
844 @Override
845 public void handleStarted() {
846 super.handleStarted();
847 }
848
849 @Override
850 public void handleData(TmfEvent event) {
851 super.handleData(event);
852 if (event != null) {
853 TmfTimestamp ts = event.getTimestamp();
854 if (startTime == null)
855 startTime = new TmfTimestamp(ts);
856 lastTime = new TmfTimestamp(ts);
857 if ((getNbRead() % fIndexPageSize) == 1 && getNbRead() != 1) {
858 updateExperiment();
859 }
860 }
861 }
862
863 @Override
864 public void handleSuccess() {
12c155f5 865// long indexingEnd = System.nanoTime();
9b635e61 866
a79913eb
FC
867 if (getRange() != TmfTimeRange.Eternity) {
868 lastTime = getRange().getEndTime();
869 }
870 updateExperiment();
12c155f5 871// System.out.println(System.currentTimeMillis() + ": Experiment indexing completed");
f9673903 872
12c155f5
FC
873// long average = (indexingEnd - indexingStart) / fNbEvents;
874// System.out.println(getName() + ": start=" + startTime + ", end=" + lastTime + ", elapsed="
875// + (indexingEnd * 1.0 - indexingStart) / 1000000000);
876// System.out.println(getName() + ": nbEvents=" + fNbEvents + " (" + (average / 1000) + "."
877// + (average % 1000) + " us/evt)");
a79913eb
FC
878 super.handleSuccess();
879 }
e31e01e8 880
05bd3318
FC
881 @Override
882 public void handleCompleted() {
883 job.cancel();
a79913eb
FC
884 super.handleCompleted();
885 synchronized (fCheckpoints) {
886 fIndexing = false;
887 if (fIndexingPendingRange != TmfTimeRange.Null) {
888 indexExperiment(false, (int) fNbEvents, fIndexingPendingRange);
889 fIndexingPendingRange = TmfTimeRange.Null;
890 }
891 }
05bd3318
FC
892 }
893
a79913eb
FC
894 private void updateExperiment() {
895 int nbRead = getNbRead();
4dc47e28
FC
896 if (startTime != null) {
897 fTimeRange = new TmfTimeRange(startTime, new TmfTimestamp(lastTime));
898 }
a79913eb 899 if (nbRead != 0) {
0c2a2e08
FC
900// updateTimeRange();
901// updateNbEvents();
a79913eb
FC
902 fNbEvents = initialNbEvents + nbRead;
903 notifyListeners();
904 }
905 }
906 };
907
908 sendRequest((ITmfDataRequest<T>) request);
909 if (waitForCompletion)
910 try {
911 request.waitForCompletion();
912 } catch (InterruptedException e) {
913 e.printStackTrace();
914 }
915 }
916
917 protected void notifyListeners() {
918 broadcast(new TmfExperimentUpdatedSignal(this, this)); // , null));
1b70b6dc 919 //broadcast(new TmfExperimentRangeUpdatedSignal(this, this, fTimeRange)); // , null));
a79913eb
FC
920 }
921
8c8bf09f
ASL
922 // ------------------------------------------------------------------------
923 // Signal handlers
924 // ------------------------------------------------------------------------
925
926 @TmfSignalHandler
951d134a 927 public void experimentSelected(TmfExperimentSelectedSignal<T> signal) {
a79913eb
FC
928 TmfExperiment<?> experiment = signal.getExperiment();
929 if (experiment == this) {
930 setCurrentExperiment(experiment);
1b70b6dc 931 fEndSynchReference = new Integer(signal.getReference());
a79913eb 932 }
8c8bf09f
ASL
933 }
934
1b70b6dc
PT
935 @TmfSignalHandler
936 public void endSync(TmfEndSynchSignal signal) {
937 if (fEndSynchReference != null && fEndSynchReference.intValue() == signal.getReference()) {
938 fEndSynchReference = null;
939 initializeStreamingMonitor();
940 }
941
942 }
943
8c8bf09f
ASL
944 @TmfSignalHandler
945 public void experimentUpdated(TmfExperimentUpdatedSignal signal) {
8c8bf09f
ASL
946 }
947
1b70b6dc
PT
948 @TmfSignalHandler
949 public void experimentRangeUpdated(TmfExperimentRangeUpdatedSignal signal) {
950 indexExperiment(false, (int) fNbEvents, signal.getRange());
951 }
952
8c8bf09f
ASL
953 @TmfSignalHandler
954 public void traceUpdated(TmfTraceUpdatedSignal signal) {
12c155f5 955 for (ITmfTrace<T> trace : fTraces) {
a79913eb
FC
956 if (trace == signal.getTrace()) {
957 synchronized (fCheckpoints) {
958 if (fIndexing) {
959 if (fIndexingPendingRange == TmfTimeRange.Null) {
960 fIndexingPendingRange = signal.getRange();
961 } else {
962 TmfTimestamp startTime = fIndexingPendingRange.getStartTime();
963 TmfTimestamp endTime = fIndexingPendingRange.getEndTime();
964 if (signal.getRange().getStartTime().compareTo(startTime) < 0) {
965 startTime = signal.getRange().getStartTime();
966 }
967 if (signal.getRange().getEndTime().compareTo(endTime) > 0) {
968 endTime = signal.getRange().getEndTime();
969 }
970 fIndexingPendingRange = new TmfTimeRange(startTime, endTime);
971 }
972 return;
973 }
974 }
975 indexExperiment(false, (int) fNbEvents, signal.getRange());
976 return;
977 }
978 }
8c8bf09f
ASL
979 }
980
12c155f5
FC
981 @Override
982 public String getPath() {
983 // TODO Auto-generated method stub
984 return null;
985 }
986
4dc47e28 987}
This page took 0.084665 seconds and 5 git commands to generate.