ctf: Fix priority list containing closed streams
[deliverable/tracecompass.git] / ctf / org.eclipse.tracecompass.ctf.core / src / org / eclipse / tracecompass / ctf / core / trace / CTFTraceReader.java
1 /*******************************************************************************
2 * Copyright (c) 2011, 2014 Ericsson, Ecole Polytechnique de Montreal and others
3 *
4 * All rights reserved. This program and the accompanying materials are made
5 * available under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * Contributors:
10 * Matthew Khouzam - Initial API and implementation
11 * Alexandre Montplaisir - Initial API and implementation
12 *******************************************************************************/
13
14 package org.eclipse.tracecompass.ctf.core.trace;
15
16 import java.io.IOException;
17 import java.util.ArrayList;
18 import java.util.Collections;
19 import java.util.HashSet;
20 import java.util.List;
21 import java.util.PriorityQueue;
22 import java.util.Set;
23
24 import org.eclipse.tracecompass.ctf.core.CTFException;
25 import org.eclipse.tracecompass.ctf.core.event.EventDefinition;
26 import org.eclipse.tracecompass.ctf.core.event.IEventDeclaration;
27 import org.eclipse.tracecompass.internal.ctf.core.Activator;
28 import org.eclipse.tracecompass.internal.ctf.core.trace.StreamInputReaderTimestampComparator;
29
30 import com.google.common.collect.ImmutableSet;
31 import com.google.common.collect.ImmutableSet.Builder;
32
33 /**
34 * A CTF trace reader. Reads the events of a trace.
35 *
36 * @version 1.0
37 * @author Matthew Khouzam
38 * @author Alexandre Montplaisir
39 */
40 public class CTFTraceReader implements AutoCloseable {
41
42 private static final int LINE_LENGTH = 60;
43
44 private static final int MIN_PRIO_SIZE = 16;
45
46 // ------------------------------------------------------------------------
47 // Attributes
48 // ------------------------------------------------------------------------
49
50 /**
51 * The trace to read from.
52 */
53 private final CTFTrace fTrace;
54
55 /**
56 * Vector of all the trace file readers.
57 */
58 private final List<CTFStreamInputReader> fStreamInputReaders =
59 Collections.synchronizedList(new ArrayList<CTFStreamInputReader>());
60
61 /**
62 * Priority queue to order the trace file readers by timestamp.
63 */
64 private PriorityQueue<CTFStreamInputReader> fPrio;
65
66 /**
67 * Array to count the number of event per trace file.
68 */
69 private long[] fEventCountPerTraceFile;
70
71 /**
72 * Timestamp of the first event in the trace
73 */
74 private long fStartTime;
75
76 /**
77 * Timestamp of the last event read so far
78 */
79 private long fEndTime;
80
81 /**
82 * Boolean to indicate if the CTFTraceReader has been closed
83 */
84 private boolean fClosed = false;
85
86 // ------------------------------------------------------------------------
87 // Constructors
88 // ------------------------------------------------------------------------
89
90 /**
91 * Constructs a TraceReader to read a trace.
92 *
93 * @param trace
94 * The trace to read from.
95 * @throws CTFException
96 * if an error occurs
97 */
98 public CTFTraceReader(CTFTrace trace) throws CTFException {
99 fTrace = trace;
100 fStreamInputReaders.clear();
101
102 /**
103 * Create the trace file readers.
104 */
105 createStreamInputReaders();
106
107 /**
108 * Populate the timestamp-based priority queue.
109 */
110 populateStreamInputReaderHeap();
111
112 /**
113 * Get the start Time of this trace bear in mind that the trace could be
114 * empty.
115 */
116 fStartTime = 0;
117 if (hasMoreEvents()) {
118 fStartTime = getTopStream().getCurrentEvent().getTimestamp();
119 setEndTime(fStartTime);
120 }
121 }
122
123 /**
124 * Copy constructor
125 *
126 * @return The new CTFTraceReader
127 * @throws CTFException
128 * if an error occurs
129 */
130 public CTFTraceReader copyFrom() throws CTFException {
131 CTFTraceReader newReader = null;
132
133 newReader = new CTFTraceReader(fTrace);
134 newReader.fStartTime = fStartTime;
135 newReader.setEndTime(fEndTime);
136 return newReader;
137 }
138
139 /**
140 * Dispose the CTFTraceReader
141 */
142 @Override
143 public void close() {
144 synchronized (fStreamInputReaders) {
145 for (CTFStreamInputReader reader : fStreamInputReaders) {
146 if (reader != null) {
147 try {
148 reader.close();
149 } catch (IOException e) {
150 Activator.logError(e.getMessage(), e);
151 }
152 }
153 }
154 fStreamInputReaders.clear();
155 }
156 fPrio.clear();
157 fClosed = true;
158 }
159
160 // ------------------------------------------------------------------------
161 // Getters/Setters/Predicates
162 // ------------------------------------------------------------------------
163
164 /**
165 * Return the start time of this trace (== timestamp of the first event)
166 *
167 * @return the trace start time
168 */
169 public long getStartTime() {
170 return fStartTime;
171 }
172
173 /**
174 * Set the trace's end time
175 *
176 * @param endTime
177 * The end time to use
178 */
179 protected final void setEndTime(long endTime) {
180 fEndTime = endTime;
181 }
182
183 /**
184 * Get the priority queue of this trace reader.
185 *
186 * @return The priority queue of input readers
187 */
188 protected PriorityQueue<CTFStreamInputReader> getPrio() {
189 return fPrio;
190 }
191
192 // ------------------------------------------------------------------------
193 // Operations
194 // ------------------------------------------------------------------------
195
196 /**
197 * Creates one trace file reader per trace file contained in the trace.
198 *
199 * @throws CTFException
200 * if an error occurs
201 */
202 private void createStreamInputReaders() throws CTFException {
203 /*
204 * For each stream.
205 */
206 for (CTFStream stream : fTrace.getStreams()) {
207 Set<CTFStreamInput> streamInputs = stream.getStreamInputs();
208
209 /*
210 * For each trace file of the stream.
211 */
212 for (CTFStreamInput streamInput : streamInputs) {
213
214 /*
215 * Create a reader and add it to the group.
216 */
217 fStreamInputReaders.add(new CTFStreamInputReader(streamInput));
218 }
219 }
220
221 /*
222 * Create the array to count the number of event per trace file.
223 */
224 fEventCountPerTraceFile = new long[fStreamInputReaders.size()];
225 }
226
227 /**
228 * Returns whether or not this CTFTraceReader has been closed
229 *
230 * @return true if it has been closed, false else
231 * @since 1.1
232 */
233 public boolean isClosed() {
234 return fClosed;
235 }
236
237 /**
238 * Update the priority queue to make it match the parent trace
239 *
240 * @throws CTFException
241 * An error occured
242 */
243 public void update() throws CTFException {
244 Set<CTFStreamInputReader> readers = new HashSet<>();
245 for (CTFStream stream : fTrace.getStreams()) {
246 Set<CTFStreamInput> streamInputs = stream.getStreamInputs();
247 for (CTFStreamInput streamInput : streamInputs) {
248 /*
249 * Create a reader.
250 */
251 CTFStreamInputReader streamInputReader = new CTFStreamInputReader(
252 streamInput);
253
254 /*
255 * Add it to the group.
256 */
257 if (!fStreamInputReaders.contains(streamInputReader)) {
258 streamInputReader.readNextEvent();
259 fStreamInputReaders.add(streamInputReader);
260 readers.add(streamInputReader);
261 }
262 }
263 }
264 long[] temp = fEventCountPerTraceFile;
265 fEventCountPerTraceFile = new long[readers.size() + temp.length];
266 for (CTFStreamInputReader reader : readers) {
267 fPrio.add(reader);
268 }
269 for (int i = 0; i < temp.length; i++) {
270 fEventCountPerTraceFile[i] = temp[i];
271 }
272 }
273
274 /**
275 * Gets an iterable of the stream input readers, useful for foreaches
276 *
277 * @return the iterable of the stream input readers
278 */
279 public Iterable<IEventDeclaration> getEventDeclarations() {
280 ImmutableSet.Builder<IEventDeclaration> builder = new Builder<>();
281 for (CTFStreamInputReader sir : fStreamInputReaders) {
282 builder.addAll(sir.getEventDeclarations());
283 }
284 return builder.build();
285 }
286
287 /**
288 * Initializes the priority queue used to choose the trace file with the
289 * lower next event timestamp.
290 *
291 * @throws CTFException
292 * if an error occurs
293 */
294 private void populateStreamInputReaderHeap() throws CTFException {
295 if (fStreamInputReaders.isEmpty()) {
296 fPrio = new PriorityQueue<>(MIN_PRIO_SIZE,
297 new StreamInputReaderTimestampComparator());
298 return;
299 }
300
301 /*
302 * Create the priority queue with a size twice as bigger as the number
303 * of reader in order to avoid constant resizing.
304 */
305 fPrio = new PriorityQueue<>(
306 Math.max(fStreamInputReaders.size() * 2, MIN_PRIO_SIZE),
307 new StreamInputReaderTimestampComparator());
308
309 int pos = 0;
310
311 for (CTFStreamInputReader reader : fStreamInputReaders) {
312 /*
313 * Add each trace file reader in the priority queue, if we are able
314 * to read an event from it.
315 */
316 reader.setParent(this);
317 CTFResponse readNextEvent = reader.readNextEvent();
318 if (readNextEvent == CTFResponse.OK || readNextEvent == CTFResponse.WAIT) {
319 fPrio.add(reader);
320
321 fEventCountPerTraceFile[pos] = 0;
322 reader.setName(pos);
323
324 pos++;
325 }
326 }
327 }
328
329 /**
330 * Get the current event, which is the current event of the trace file
331 * reader with the lowest timestamp.
332 *
333 * @return An event definition, or null of the trace reader reached the end
334 * of the trace.
335 */
336 public EventDefinition getCurrentEventDef() {
337 CTFStreamInputReader top = getTopStream();
338 return (top != null) ? top.getCurrentEvent() : null;
339 }
340
341 /**
342 * Go to the next event.
343 *
344 * @return True if an event was read.
345 * @throws CTFException
346 * if an error occurs
347 */
348 public boolean advance() throws CTFException {
349 /*
350 * Remove the reader from the top of the priority queue.
351 */
352 CTFStreamInputReader top = fPrio.poll();
353
354 /*
355 * If the queue was empty.
356 */
357 if (top == null) {
358 return false;
359 }
360 /*
361 * Read the next event of this reader.
362 */
363 switch (top.readNextEvent()) {
364 case OK: {
365 /*
366 * Add it back in the queue.
367 */
368 fPrio.add(top);
369 final long topEnd = fTrace.timestampCyclesToNanos(top.getCurrentEvent().getTimestamp());
370 setEndTime(Math.max(topEnd, getEndTime()));
371 fEventCountPerTraceFile[top.getName()]++;
372
373 if (top.getCurrentEvent() != null) {
374 fEndTime = Math.max(top.getCurrentEvent().getTimestamp(),
375 fEndTime);
376 }
377 break;
378 }
379 case WAIT: {
380 fPrio.add(top);
381 break;
382 }
383 case FINISH:
384 break;
385 case ERROR:
386 default:
387 // something bad happend
388 }
389 /*
390 * If there is no reader in the queue, it means the trace reader reached
391 * the end of the trace.
392 */
393 return hasMoreEvents();
394 }
395
396 /**
397 * Go to the last event in the trace.
398 *
399 * @throws CTFException
400 * if an error occurs
401 */
402 public void goToLastEvent() throws CTFException {
403 seek(getEndTime());
404 while (fPrio.size() > 1) {
405 advance();
406 }
407 }
408
409 /**
410 * Seeks to a given timestamp. It will seek to the nearest event greater or
411 * equal to timestamp. If a trace is [10 20 30 40] and you are looking for
412 * 19, it will give you 20. If you want 20, you will get 20, if you want 21,
413 * you will get 30. The value -inf will seek to the first element and the
414 * value +inf will seek to the end of the file (past the last event).
415 *
416 * @param timestamp
417 * the timestamp to seek to
418 * @return true if there are events above or equal the seek timestamp, false
419 * if seek at the end of the trace (no valid event).
420 * @throws CTFException
421 * if an error occurs
422 */
423 public boolean seek(long timestamp) throws CTFException {
424 /*
425 * Remove all the trace readers from the priority queue
426 */
427 fPrio.clear();
428 for (CTFStreamInputReader streamInputReader : fStreamInputReaders) {
429 /*
430 * Seek the trace reader.
431 */
432 streamInputReader.seek(timestamp);
433
434 /*
435 * Add it to the priority queue if there is a current event.
436 */
437 if (streamInputReader.getCurrentEvent() != null) {
438 fPrio.add(streamInputReader);
439 }
440 }
441 return hasMoreEvents();
442 }
443
444 /**
445 * Gets the stream with the oldest event
446 *
447 * @return the stream with the oldest event
448 */
449 public CTFStreamInputReader getTopStream() {
450 return fPrio.peek();
451 }
452
453 /**
454 * Does the trace have more events?
455 *
456 * @return true if yes.
457 */
458 public final boolean hasMoreEvents() {
459 return fPrio.size() > 0;
460 }
461
462 /**
463 * Prints the event count stats.
464 */
465 public void printStats() {
466 printStats(LINE_LENGTH);
467 }
468
469 /**
470 * Prints the event count stats.
471 *
472 * @param width
473 * Width of the display.
474 */
475 public void printStats(int width) {
476 int numEvents = 0;
477 if (width == 0) {
478 return;
479 }
480
481 for (long i : fEventCountPerTraceFile) {
482 numEvents += i;
483 }
484
485 for (int j = 0; j < fEventCountPerTraceFile.length; j++) {
486 CTFStreamInputReader se = fStreamInputReaders.get(j);
487
488 long len = (width * fEventCountPerTraceFile[se.getName()])
489 / numEvents;
490
491 StringBuilder sb = new StringBuilder(se.getFilename());
492 sb.append("\t["); //$NON-NLS-1$
493
494 for (int i = 0; i < len; i++) {
495 sb.append('+');
496 }
497
498 for (long i = len; i < width; i++) {
499 sb.append(' ');
500 }
501
502 sb.append("]\t" + fEventCountPerTraceFile[se.getName()] + " Events"); //$NON-NLS-1$//$NON-NLS-2$
503 Activator.log(sb.toString());
504 }
505 }
506
507 /**
508 * Gets the last event timestamp that was read. This is NOT necessarily the
509 * last event in a trace, just the last one read so far.
510 *
511 * @return the last event
512 */
513 public long getEndTime() {
514 return fEndTime;
515 }
516
517 /**
518 * Sets a trace to be live or not
519 *
520 * @param live
521 * whether the trace is live
522 */
523 public void setLive(boolean live) {
524 for (CTFStreamInputReader s : fPrio) {
525 s.setLive(live);
526 }
527 }
528
529 /**
530 * Get if the trace is to read live or not
531 *
532 * @return whether the trace is live or not
533 */
534 public boolean isLive() {
535 return getTopStream().isLive();
536 }
537
538 @Override
539 public int hashCode() {
540 final int prime = 31;
541 int result = 1;
542 result = (prime * result) + (int) (fStartTime ^ (fStartTime >>> 32));
543 result = (prime * result) + fStreamInputReaders.hashCode();
544 result = (prime * result) + ((fTrace == null) ? 0 : fTrace.hashCode());
545 return result;
546 }
547
548 @Override
549 public boolean equals(Object obj) {
550 if (this == obj) {
551 return true;
552 }
553 if (obj == null) {
554 return false;
555 }
556 if (!(obj instanceof CTFTraceReader)) {
557 return false;
558 }
559 CTFTraceReader other = (CTFTraceReader) obj;
560 if (!fStreamInputReaders.equals(other.fStreamInputReaders)) {
561 return false;
562 }
563 if (fTrace == null) {
564 if (other.fTrace != null) {
565 return false;
566 }
567 } else if (!fTrace.equals(other.fTrace)) {
568 return false;
569 }
570 return true;
571 }
572
573 @Override
574 public String toString() {
575 /* Only for debugging, shouldn't be externalized */
576 return "CTFTraceReader [trace=" + fTrace + ']'; //$NON-NLS-1$
577 }
578
579 /**
580 * Gets the parent trace
581 *
582 * @return the parent trace
583 */
584 public CTFTrace getTrace() {
585 return fTrace;
586 }
587
588 /**
589 * This will read the entire trace and populate all the indexes. The reader
590 * will then be reset to the first event in the trace.
591 *
592 * Do not call in the fast path.
593 *
594 * @throws CTFException
595 * A trace reading error occurred
596 * @since 1.0
597 */
598 public void populateIndex() throws CTFException {
599 for (CTFStreamInputReader sir : fPrio) {
600 sir.goToLastEvent();
601 }
602 seek(0);
603
604 }
605 }
This page took 0.089346 seconds and 6 git commands to generate.