2010-11-09 Francois Chouinard <fchouinard@gmail.com> Contribution for Bug315307
[deliverable/tracecompass.git] / org.eclipse.linuxtools.tmf / src / org / eclipse / linuxtools / tmf / experiment / TmfExperiment.java
1 /*******************************************************************************
2 * Copyright (c) 2009, 2010 Ericsson
3 *
4 * All rights reserved. This program and the accompanying materials are
5 * made available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Francois Chouinard - Initial API and implementation
11 *******************************************************************************/
12
13 package org.eclipse.linuxtools.tmf.experiment;
14
15 import java.util.Collections;
16 import java.util.Vector;
17
18 import org.eclipse.linuxtools.tmf.component.TmfEventProvider;
19 import org.eclipse.linuxtools.tmf.event.TmfEvent;
20 import org.eclipse.linuxtools.tmf.event.TmfTimeRange;
21 import org.eclipse.linuxtools.tmf.event.TmfTimestamp;
22 import org.eclipse.linuxtools.tmf.request.ITmfDataRequest;
23 import org.eclipse.linuxtools.tmf.request.ITmfDataRequest.ExecutionType;
24 import org.eclipse.linuxtools.tmf.request.ITmfEventRequest;
25 import org.eclipse.linuxtools.tmf.request.TmfDataRequest;
26 import org.eclipse.linuxtools.tmf.request.TmfEventRequest;
27 import org.eclipse.linuxtools.tmf.signal.TmfExperimentSelectedSignal;
28 import org.eclipse.linuxtools.tmf.signal.TmfExperimentUpdatedSignal;
29 import org.eclipse.linuxtools.tmf.signal.TmfSignalHandler;
30 import org.eclipse.linuxtools.tmf.signal.TmfSignalManager;
31 import org.eclipse.linuxtools.tmf.signal.TmfTraceUpdatedSignal;
32 import org.eclipse.linuxtools.tmf.trace.ITmfContext;
33 import org.eclipse.linuxtools.tmf.trace.ITmfLocation;
34 import org.eclipse.linuxtools.tmf.trace.ITmfTrace;
35 import org.eclipse.linuxtools.tmf.trace.TmfCheckpoint;
36 import org.eclipse.linuxtools.tmf.trace.TmfContext;
37
38 /**
39 * <b><u>TmfExperiment</u></b>
40 * <p>
41 * TmfExperiment presents a time-ordered, unified view of a set of TmfTraces
42 * that are part of a tracing experiment.
43 * <p>
44 */
45 public class TmfExperiment<T extends TmfEvent> extends TmfEventProvider<T> implements ITmfTrace {
46
47 // ------------------------------------------------------------------------
48 // Attributes
49 // ------------------------------------------------------------------------
50
51 // The currently selected experiment
52 protected static TmfExperiment<?> fCurrentExperiment = null;
53
54 // The set of traces that constitute the experiment
55 protected ITmfTrace[] fTraces;
56
57 // The total number of events
58 protected long fNbEvents;
59
60 // The experiment time range
61 protected TmfTimeRange fTimeRange;
62
63 // The experiment reference timestamp (default: Zero)
64 protected TmfTimestamp fEpoch;
65
66 // The experiment index
67 protected Vector<TmfCheckpoint> fCheckpoints = new Vector<TmfCheckpoint>();
68
69 // The current experiment context
70 protected TmfExperimentContext fExperimentContext;
71
72 // ------------------------------------------------------------------------
73 // Constructors
74 // ------------------------------------------------------------------------
75
76 /**
77 * @param type
78 * @param id
79 * @param traces
80 * @param epoch
81 * @param indexPageSize
82 */
83 public TmfExperiment(Class<T> type, String id, ITmfTrace[] traces, TmfTimestamp epoch, int indexPageSize) {
84 this(type, id, traces, TmfTimestamp.Zero, indexPageSize, false);
85 }
86
87 public TmfExperiment(Class<T> type, String id, ITmfTrace[] traces, TmfTimestamp epoch, int indexPageSize, boolean preIndexExperiment) {
88 super(id, type);
89
90 fTraces = traces;
91 fEpoch = epoch;
92 fIndexPageSize = indexPageSize;
93
94 if (preIndexExperiment) indexExperiment(true);
95
96 updateTimeRange();
97 }
98
99 protected TmfExperiment(String id, Class<T> type) {
100 super(id, type);
101 }
102
103 /**
104 * @param type
105 * @param id
106 * @param traces
107 */
108 public TmfExperiment(Class<T> type, String id, ITmfTrace[] traces) {
109 this(type, id, traces, TmfTimestamp.Zero, DEFAULT_INDEX_PAGE_SIZE);
110 }
111
112 /**
113 * @param type
114 * @param id
115 * @param traces
116 * @param indexPageSize
117 */
118 public TmfExperiment(Class<T> type, String id, ITmfTrace[] traces, int indexPageSize) {
119 this(type, id, traces, TmfTimestamp.Zero, indexPageSize);
120 }
121
122 /**
123 * Copy constructor
124 * @param other
125 */
126 public TmfExperiment(TmfExperiment<T> other) {
127 super(other.getName() + "(clone)", other.fType); //$NON-NLS-1$
128
129 fEpoch = other.fEpoch;
130 fIndexPageSize = other.fIndexPageSize;
131
132 fTraces = new ITmfTrace[other.fTraces.length];
133 for (int trace = 0; trace < other.fTraces.length; trace++) {
134 fTraces[trace] = other.fTraces[trace].createTraceCopy();
135 }
136
137 fNbEvents = other.fNbEvents;
138 fTimeRange = other.fTimeRange;
139 }
140
141 @Override
142 public TmfExperiment<T> createTraceCopy() {
143 TmfExperiment<T> experiment = new TmfExperiment<T>(this);
144 TmfSignalManager.deregister(experiment);
145 return experiment;
146 }
147
148 /**
149 * Clears the experiment
150 */
151 @Override
152 public synchronized void dispose() {
153 if (fTraces != null) {
154 for (ITmfTrace trace : fTraces) {
155 trace.dispose();
156 }
157 fTraces = null;
158 }
159 if (fCheckpoints != null) {
160 fCheckpoints.clear();
161 }
162 super.dispose();
163 }
164
165 // ------------------------------------------------------------------------
166 // ITmfTrace
167 // ------------------------------------------------------------------------
168
169 @Override
170 public String getPath() {
171 return null;
172 }
173
174 @Override
175 public long getNbEvents() {
176 return fNbEvents;
177 }
178
179 @Override
180 public int getCacheSize() {
181 return fIndexPageSize;
182 }
183
184 @Override
185 public TmfTimeRange getTimeRange() {
186 return fTimeRange;
187 }
188
189 @Override
190 public TmfTimestamp getStartTime() {
191 return fTimeRange.getStartTime();
192 }
193
194 @Override
195 public TmfTimestamp getEndTime() {
196 return fTimeRange.getEndTime();
197 }
198
199 public Vector<TmfCheckpoint> getCheckpoints() {
200 return fCheckpoints;
201 }
202
203 // ------------------------------------------------------------------------
204 // Accessors
205 // ------------------------------------------------------------------------
206
207 public static void setCurrentExperiment(TmfExperiment<?> experiment) {
208 fCurrentExperiment = experiment;
209 }
210
211 public static TmfExperiment<?> getCurrentExperiment() {
212 return fCurrentExperiment;
213 }
214
215 public TmfTimestamp getEpoch() {
216 return fEpoch;
217 }
218
219 public ITmfTrace[] getTraces() {
220 return fTraces;
221 }
222
223 /**
224 * Returns the rank of the first event with the requested timestamp.
225 * If none, returns the index of the next event (if any).
226 *
227 * @param timestamp
228 * @return
229 */
230 @Override
231 public long getRank(TmfTimestamp timestamp) {
232 TmfExperimentContext context = seekEvent(timestamp);
233 return context.getRank();
234 }
235
236 /**
237 * Returns the timestamp of the event at the requested index.
238 * If none, returns null.
239 *
240 * @param index
241 * @return
242 */
243 public TmfTimestamp getTimestamp(int index) {
244 TmfExperimentContext context = seekEvent(index);
245 TmfEvent event = getNextEvent(context);
246 return (event != null) ? event.getTimestamp() : null;
247 }
248
249 // ------------------------------------------------------------------------
250 // Operators
251 // ------------------------------------------------------------------------
252
253 /**
254 * Update the total number of events
255 */
256 private void updateNbEvents() {
257 int nbEvents = 0;
258 for (ITmfTrace trace : fTraces) {
259 nbEvents += trace.getNbEvents();
260 }
261 fNbEvents = nbEvents;
262 }
263
264 /**
265 * Update the global time range
266 */
267 private void updateTimeRange() {
268 TmfTimestamp startTime = fTimeRange != null ? fTimeRange.getStartTime() : TmfTimestamp.BigCrunch;
269 TmfTimestamp endTime = fTimeRange != null ? fTimeRange.getEndTime() : TmfTimestamp.BigBang;
270
271 for (ITmfTrace trace : fTraces) {
272 TmfTimestamp traceStartTime = trace.getStartTime();
273 if (traceStartTime.compareTo(startTime, true) < 0)
274 startTime = traceStartTime;
275 TmfTimestamp traceEndTime = trace.getEndTime();
276 if (traceEndTime.compareTo(endTime, true) > 0)
277 endTime = traceEndTime;
278 }
279 fTimeRange = new TmfTimeRange(startTime, endTime);
280 }
281
282 // ------------------------------------------------------------------------
283 // TmfProvider
284 // ------------------------------------------------------------------------
285
286 @Override
287 public ITmfContext armRequest(ITmfDataRequest<T> request) {
288 // Tracer.trace("Ctx: Arming request - start");
289 TmfTimestamp timestamp = (request instanceof ITmfEventRequest<?>) ?
290 ((ITmfEventRequest<T>) request).getRange().getStartTime() : null;
291 TmfExperimentContext context = (timestamp != null) ?
292 seekEvent(timestamp) : seekEvent(request.getIndex());
293 // Tracer.trace("Ctx: Arming request - done");
294 return context;
295 }
296
297 @SuppressWarnings("unchecked")
298 @Override
299 public T getNext(ITmfContext context) {
300 if (context instanceof TmfExperimentContext) {
301 return (T) getNextEvent((TmfExperimentContext) context);
302 }
303 return null;
304 }
305
306 // ------------------------------------------------------------------------
307 // ITmfTrace trace positioning
308 // ------------------------------------------------------------------------
309
310 // Returns a brand new context based on the location provided
311 // and initializes the event queues
312 @Override
313 public synchronized TmfExperimentContext seekLocation(ITmfLocation<?> location) {
314
315 // Validate the location
316 if (location != null && !(location instanceof TmfExperimentLocation)) {
317 return null; // Throw an exception?
318 }
319
320 // Instantiate the location
321 TmfExperimentLocation expLocation = (location == null)
322 ? new TmfExperimentLocation(new ITmfLocation<?>[fTraces.length], new long[fTraces.length])
323 : (TmfExperimentLocation) location.clone();
324
325 // Create and populate the context's traces contexts
326 TmfExperimentContext context = new TmfExperimentContext(fTraces, new TmfContext[fTraces.length]);
327 // Tracer.trace("Ctx: SeekLocation - start");
328
329 long rank = 0;
330 for (int i = 0; i < fTraces.length; i++) {
331 // Get the relevant trace attributes
332 ITmfLocation<?> traceLocation = expLocation.getLocation()[i];
333 long traceRank = expLocation.getRanks()[i];
334
335 // Set the corresponding sub-context
336 context.getContexts()[i] = fTraces[i].seekLocation(traceLocation);
337 context.getContexts()[i].setRank(traceRank);
338 rank += traceRank;
339
340 // Set the trace location and read the corresponding event
341 expLocation.getLocation()[i] = context.getContexts()[i].getLocation();
342 context.getEvents()[i] = fTraces[i].getNextEvent(context.getContexts()[i]);
343 }
344
345 // Tracer.trace("Ctx: SeekLocation - done");
346
347 // Finalize context
348 context.setLocation(expLocation);
349 context.setLastTrace(TmfExperimentContext.NO_TRACE);
350 context.setRank(rank);
351
352 fExperimentContext = context;
353
354 return context;
355 }
356
357 /* (non-Javadoc)
358 * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#seekEvent(org.eclipse.linuxtools.tmf.event.TmfTimestamp)
359 */
360 @Override
361 public synchronized TmfExperimentContext seekEvent(TmfTimestamp timestamp) {
362
363 // Tracer.trace("Ctx: seekEvent(TS) - start");
364
365 if (timestamp == null) {
366 timestamp = TmfTimestamp.BigBang;
367 }
368
369 // First, find the right checkpoint
370 int index = Collections.binarySearch(fCheckpoints, new TmfCheckpoint(timestamp, null));
371
372 // In the very likely case that the checkpoint was not found, bsearch
373 // returns its negated would-be location (not an offset...). From that
374 // index, we can then position the stream and get the event.
375 if (index < 0) {
376 index = Math.max(0, -(index + 2));
377 }
378
379 // Position the experiment at the checkpoint
380 ITmfLocation<?> location;
381 synchronized (fCheckpoints) {
382 if (fCheckpoints.size() > 0) {
383 if (index >= fCheckpoints.size()) {
384 index = fCheckpoints.size() - 1;
385 }
386 location = fCheckpoints.elementAt(index).getLocation();
387 }
388 else {
389 location = null;
390 }
391 }
392
393 TmfExperimentContext context = seekLocation(location);
394 context.setRank((long) index * fIndexPageSize);
395
396 // And locate the event
397 TmfEvent event = parseEvent(context);
398 while (event != null && event.getTimestamp().compareTo(timestamp, false) < 0) {
399 getNextEvent(context);
400 event = parseEvent(context);
401 }
402
403 if (event == null) {
404 context.setLocation(null);
405 context.setRank(ITmfContext.UNKNOWN_RANK);
406 }
407
408 return context;
409 }
410
411 /* (non-Javadoc)
412 * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#seekEvent(long)
413 */
414 @Override
415 public synchronized TmfExperimentContext seekEvent(long rank) {
416
417 // Tracer.trace("Ctx: seekEvent(rank) - start");
418
419 // Position the stream at the previous checkpoint
420 int index = (int) rank / fIndexPageSize;
421 ITmfLocation<?> location;
422 synchronized (fCheckpoints) {
423 if (fCheckpoints.size() == 0) {
424 location = null;
425 }
426 else {
427 if (index >= fCheckpoints.size()) {
428 index = fCheckpoints.size() - 1;
429 }
430 location = fCheckpoints.elementAt(index).getLocation();
431 }
432 }
433
434 TmfExperimentContext context = seekLocation(location);
435 context.setRank((long) index * fIndexPageSize);
436
437 // And locate the event
438 TmfEvent event = parseEvent(context);
439 long pos = context.getRank();
440 while (event != null && pos++ < rank) {
441 getNextEvent(context);
442 event = parseEvent(context);
443 }
444
445 if (event == null) {
446 context.setLocation(null);
447 context.setRank(ITmfContext.UNKNOWN_RANK);
448 }
449
450 return context;
451 }
452
453 /**
454 * Scan the next events from all traces and return the next one
455 * in chronological order.
456 *
457 * @param context
458 * @return
459 */
460
461 // private void dumpContext(TmfExperimentContext context, boolean isBefore) {
462
463 // TmfContext context0 = context.getContexts()[0];
464 // TmfEvent event0 = context.getEvents()[0];
465 // TmfExperimentLocation location0 = (TmfExperimentLocation) context.getLocation();
466 // long rank0 = context.getRank();
467 // int trace = context.getLastTrace();
468 //
469 // StringBuffer result = new StringBuffer("Ctx: " + (isBefore ? "B " : "A "));
470 //
471 // result.append("[Ctx: fLoc= " + context0.getLocation().toString() + ", fRnk= " + context0.getRank() + "] ");
472 // result.append("[Evt: " + event0.getTimestamp().toString() + "] ");
473 // result.append("[Loc: fLoc= " + location0.getLocation()[0].toString() + ", fRnk= " + location0.getRanks()[0] + "] ");
474 // result.append("[Rnk: " + rank0 + "], [Trc: " + trace + "]");
475 // Tracer.trace(result.toString());
476 // }
477
478 @Override
479 public synchronized TmfEvent getNextEvent(TmfContext context) {
480
481 // Validate the context
482 if (!(context instanceof TmfExperimentContext)) {
483 return null; // Throw an exception?
484 }
485
486 if (!context.equals(fExperimentContext)) {
487 // Tracer.trace("Ctx: Restoring context");
488 seekLocation(context.getLocation());
489 }
490
491 TmfExperimentContext expContext = (TmfExperimentContext) context;
492
493 // dumpContext(expContext, true);
494
495 // If an event was consumed previously, get the next one from that trace
496 int lastTrace = expContext.getLastTrace();
497 if (lastTrace != TmfExperimentContext.NO_TRACE) {
498 TmfContext traceContext = expContext.getContexts()[lastTrace];
499 expContext.getEvents()[lastTrace] = expContext.getTraces()[lastTrace].getNextEvent(traceContext);
500 expContext.setLastTrace(TmfExperimentContext.NO_TRACE);
501 }
502
503 // Scan the candidate events and identify the "next" trace to read from
504 int trace = TmfExperimentContext.NO_TRACE;
505 TmfTimestamp timestamp = TmfTimestamp.BigCrunch;
506 for (int i = 0; i < expContext.getTraces().length; i++) {
507 TmfEvent event = expContext.getEvents()[i];
508 if (event != null && event.getTimestamp() != null) {
509 TmfTimestamp otherTS = event.getTimestamp();
510 if (otherTS.compareTo(timestamp, true) < 0) {
511 trace = i;
512 timestamp = otherTS;
513 }
514 }
515 }
516
517 // Update the experiment context and set the "next" event
518 TmfEvent event = null;
519 if (trace != TmfExperimentContext.NO_TRACE) {
520 updateIndex(expContext, timestamp);
521
522 TmfContext traceContext = expContext.getContexts()[trace];
523 TmfExperimentLocation expLocation = (TmfExperimentLocation) expContext.getLocation();
524 // expLocation.getLocation()[trace] = traceContext.getLocation().clone();
525 expLocation.getLocation()[trace] = traceContext.getLocation();
526
527 // updateIndex(expContext, timestamp);
528
529 expLocation.getRanks()[trace] = traceContext.getRank();
530 expContext.setLastTrace(trace);
531 expContext.updateRank(1);
532 event = expContext.getEvents()[trace];
533 }
534
535 // if (event != null) {
536 // Tracer.trace("Exp: " + (expContext.getRank() - 1) + ": " + event.getTimestamp().toString());
537 // dumpContext(expContext, false);
538 // Tracer.trace("Ctx: Event returned= " + event.getTimestamp().toString());
539 // }
540
541 return event;
542 }
543
544 public synchronized void updateIndex(ITmfContext context, TmfTimestamp timestamp) {
545 // Build the index as we go along
546 long rank = context.getRank();
547 if (context.isValidRank() && (rank % fIndexPageSize) == 0) {
548 // Determine the table position
549 long position = rank / fIndexPageSize;
550 // Add new entry at proper location (if empty)
551 if (fCheckpoints.size() == position) {
552 ITmfLocation<?> location = context.getLocation().clone();
553 fCheckpoints.add(new TmfCheckpoint(timestamp.clone(), location));
554 // System.out.println(this + "[" + (fCheckpoints.size() - 1) + "] " + timestamp + ", " + location.toString());
555 }
556 }
557 }
558
559 /* (non-Javadoc)
560 * @see org.eclipse.linuxtools.tmf.trace.ITmfTrace#parseEvent(org.eclipse.linuxtools.tmf.trace.TmfContext)
561 */
562 @Override
563 public TmfEvent parseEvent(TmfContext context) {
564
565 // Validate the context
566 if (!(context instanceof TmfExperimentContext)) {
567 return null; // Throw an exception?
568 }
569
570 if (!context.equals(fExperimentContext)) {
571 // Tracer.trace("Ctx: Restoring context");
572 seekLocation(context.getLocation());
573 }
574
575 TmfExperimentContext expContext = (TmfExperimentContext) context;
576
577 // If an event was consumed previously, get the next one from that trace
578 int lastTrace = expContext.getLastTrace();
579 if (lastTrace != TmfExperimentContext.NO_TRACE) {
580 TmfContext traceContext = expContext.getContexts()[lastTrace];
581 expContext.getEvents()[lastTrace] = expContext.getTraces()[lastTrace].getNextEvent(traceContext);
582 expContext.setLastTrace(TmfExperimentContext.NO_TRACE);
583 fExperimentContext = (TmfExperimentContext) context;
584 }
585
586 // Scan the candidate events and identify the "next" trace to read from
587 int trace = TmfExperimentContext.NO_TRACE;
588 TmfTimestamp timestamp = TmfTimestamp.BigCrunch;
589 for (int i = 0; i < expContext.getTraces().length; i++) {
590 TmfEvent event = expContext.getEvents()[i];
591 if (event != null && event.getTimestamp() != null) {
592 TmfTimestamp otherTS = event.getTimestamp();
593 if (otherTS.compareTo(timestamp, true) < 0) {
594 trace = i;
595 timestamp = otherTS;
596 }
597 }
598 }
599
600 TmfEvent event = null;
601 if (trace != TmfExperimentContext.NO_TRACE) {
602 event = expContext.getEvents()[trace];
603 }
604
605 return event;
606 }
607
608 /* (non-Javadoc)
609 * @see java.lang.Object#toString()
610 */
611 @Override
612 @SuppressWarnings("nls")
613 public String toString() {
614 return "[TmfExperiment (" + getName() + ")]";
615 }
616
617 // ------------------------------------------------------------------------
618 // Indexing
619 // ------------------------------------------------------------------------
620
621 /*
622 * The experiment holds the globally ordered events of its set of traces.
623 * It is expected to provide access to each individual event by index i.e.
624 * it must be possible to request the Nth event of the experiment.
625 *
626 * The purpose of the index is to keep the information needed to rapidly
627 * restore the traces contexts at regular intervals (every INDEX_PAGE_SIZE
628 * event).
629 */
630
631 // The index page size
632 private static final int DEFAULT_INDEX_PAGE_SIZE = 5000;
633 protected int fIndexPageSize;
634
635 // private static BufferedWriter fEventLog = null;
636 // private static BufferedWriter openLogFile(String filename) {
637 // BufferedWriter outfile = null;
638 // try {
639 // outfile = new BufferedWriter(new FileWriter(filename));
640 // } catch (IOException e) {
641 // e.printStackTrace();
642 // }
643 // return outfile;
644 // }
645
646 @SuppressWarnings("unchecked")
647 private void indexExperiment(boolean waitForCompletion) {
648
649 fCheckpoints.clear();
650
651 // fEventLog = openLogFile("TraceEvent.log");
652 // System.out.println(System.currentTimeMillis() + ": Experiment indexing started");
653
654 ITmfEventRequest<TmfEvent> request = new TmfEventRequest<TmfEvent>(TmfEvent.class, TmfTimeRange.Eternity,
655 TmfDataRequest.ALL_DATA, 1, ITmfDataRequest.ExecutionType.BACKGROUND) {
656
657 // long indexingStart = System.nanoTime();
658
659 TmfTimestamp startTime = null;
660 TmfTimestamp lastTime = null;
661
662 @Override
663 public void handleData(TmfEvent event) {
664 super.handleData(event);
665 if (event != null) {
666 TmfTimestamp ts = event.getTimestamp();
667 if (startTime == null)
668 startTime = new TmfTimestamp(ts);
669 lastTime = new TmfTimestamp(ts);
670
671 if ((getNbRead() % fIndexPageSize) == 0) {
672 updateExperiment();
673 }
674 }
675 }
676
677 @Override
678 public void handleSuccess() {
679 // long indexingEnd = System.nanoTime();
680
681 updateExperiment();
682 // System.out.println(System.currentTimeMillis() + ": Experiment indexing completed");
683
684 // long average = (indexingEnd - indexingStart) / fNbEvents;
685 // System.out.println(getName() + ": start=" + startTime + ", end=" + lastTime + ", elapsed=" + (indexingEnd * 1.0 - indexingStart) / 1000000000);
686 // System.out.println(getName() + ": nbEvents=" + fNbEvents + " (" + (average / 1000) + "." + (average % 1000) + " us/evt)");
687 }
688
689 private void updateExperiment() {
690 int nbRead = getNbRead();
691 if (nbRead != 0) {
692 // updateTimeRange();
693 // updateNbEvents();
694 fTimeRange = new TmfTimeRange(startTime, new TmfTimestamp(lastTime));
695 fNbEvents = nbRead;
696 notifyListeners();
697 }
698 }
699 };
700
701 sendRequest((ITmfDataRequest<T>) request);
702 if (waitForCompletion)
703 try {
704 request.waitForCompletion();
705 } catch (InterruptedException e) {
706 e.printStackTrace();
707 }
708 }
709
710 protected void notifyListeners() {
711 broadcast(new TmfExperimentUpdatedSignal(this, this)); // , null));
712 }
713
714 // ------------------------------------------------------------------------
715 // Signal handlers
716 // ------------------------------------------------------------------------
717
718 @TmfSignalHandler
719 public void experimentSelected(TmfExperimentSelectedSignal<T> signal) {
720 TmfExperiment<?> experiment = signal.getExperiment();
721 if (experiment == this) {
722 setCurrentExperiment(experiment);
723 indexExperiment(false);
724 }
725 else {
726 dispose();
727 }
728 }
729
730 @TmfSignalHandler
731 public void experimentUpdated(TmfExperimentUpdatedSignal signal) {
732 }
733
734 @TmfSignalHandler
735 public void traceUpdated(TmfTraceUpdatedSignal signal) {
736 // TODO: Incremental index update
737 synchronized(this) {
738 updateNbEvents();
739 updateTimeRange();
740 }
741 broadcast(new TmfExperimentUpdatedSignal(this, this)); // , signal.getTrace()));
742 }
743
744 // ------------------------------------------------------------------------
745 // TmfDataProvider
746 // ------------------------------------------------------------------------
747
748 @Override
749 protected void queueBackgroundRequest(final ITmfDataRequest<T> request, final int blockSize, final boolean indexing) {
750
751 // TODO: Handle the data requests also...
752 if (!(request instanceof ITmfEventRequest<?>)) {
753 super.queueRequest(request);
754 return;
755 }
756 final ITmfEventRequest<T> eventRequest = (ITmfEventRequest<T>) request;
757
758 Thread thread = new Thread() {
759 @Override
760 public void run() {
761
762 // final long requestStart = System.nanoTime();
763
764 final Integer[] CHUNK_SIZE = new Integer[1];
765 CHUNK_SIZE[0] = blockSize + ((indexing) ? 1 : 0);
766
767 final Integer[] nbRead = new Integer[1];
768 nbRead[0] = 0;
769
770 // final TmfTimestamp[] timestamp = new TmfTimestamp[1];
771 // timestamp[0] = new TmfTimestamp(eventRequest.getRange().getStartTime());
772 // final TmfTimestamp endTS = eventRequest.getRange().getEndTime();
773
774 final Boolean[] isFinished = new Boolean[1];
775 isFinished[0] = Boolean.FALSE;
776
777 while (!isFinished[0]) {
778
779 // TmfEventRequest<T> subRequest = new TmfEventRequest<T>(eventRequest.getDataType(), new TmfTimeRange(timestamp[0], endTS), CHUNK_SIZE[0], eventRequest.getBlockize(), ExecutionType.BACKGROUND)
780 // TmfDataRequest<T> subRequest = new TmfDataRequest<T>(eventRequest.getDataType(), nbRead[0], CHUNK_SIZE[0], eventRequest.getBlockize(), ExecutionType.BACKGROUND)
781 TmfDataRequest<T> subRequest = new TmfDataRequest<T>(eventRequest.getDataType(), nbRead[0], CHUNK_SIZE[0], ExecutionType.BACKGROUND)
782 {
783 @Override
784 public void handleData(T data) {
785 super.handleData(data);
786 eventRequest.handleData(data);
787 if (getNbRead() == CHUNK_SIZE[0]) {
788 nbRead[0] += getNbRead();
789 }
790 if (getNbRead() > CHUNK_SIZE[0]) {
791 System.out.println("ERROR - Read too many events");
792 }
793 }
794
795 @Override
796 public void handleCompleted() {
797 // System.out.println("Request completed at: " + timestamp[0]);
798 if (getNbRead() < CHUNK_SIZE[0]) {
799 if (isCancelled()) {
800 eventRequest.cancel();
801 }
802 else {
803 eventRequest.done();
804 }
805 isFinished[0] = Boolean.TRUE;
806 nbRead[0] += getNbRead();
807 // System.out.println("fNbRead=" + getNbRead() + " total=" + nbRead[0]);
808 }
809 super.handleCompleted();
810 }
811 };
812
813 if (!isFinished[0]) {
814 queueRequest(subRequest);
815
816 try {
817 subRequest.waitForCompletion();
818 // System.out.println("Finished at " + timestamp[0]);
819 } catch (InterruptedException e) {
820 e.printStackTrace();
821 }
822
823 // TmfTimestamp newTS = new TmfTimestamp(timestamp[0].getValue() + 1, timestamp[0].getScale(), timestamp[0].getPrecision());
824 // timestamp[0] = newTS;
825 CHUNK_SIZE[0] = blockSize;
826 // System.out.println("New timestamp: " + timestamp[0]);
827 }
828 }
829 // final long requestEnded = System.nanoTime();
830 // System.out.println("Background request completed. Elapsed= " + (requestEnded * 1.0 - requestStart) / 1000000000);
831 }
832 };
833
834 thread.start();
835 }
836
837 }
This page took 0.049759 seconds and 5 git commands to generate.