6 * Copyright 2010-2011 EfficiOS Inc. and Linux Foundation
8 * Author: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30 #include <babeltrace/babeltrace.h>
31 #include <babeltrace/context.h>
32 #include <babeltrace/context-internal.h>
33 #include <babeltrace/iterator-internal.h>
34 #include <babeltrace/iterator.h>
35 #include <babeltrace/prio_heap.h>
36 #include <babeltrace/ctf/metadata.h>
37 #include <babeltrace/ctf/events.h>
40 static int babeltrace_filestream_seek(struct ctf_file_stream
*file_stream
,
41 const struct bt_iter_pos
*begin_pos
,
42 unsigned long stream_id
);
44 struct stream_saved_pos
{
46 * Use file_stream pointer to check if the trace collection we
47 * restore to match the one we saved from, for each stream.
49 struct ctf_file_stream
*file_stream
;
50 size_t cur_index
; /* current index in packet index */
51 ssize_t offset
; /* offset from base, in bits. EOF for end of file. */
52 uint64_t current_real_timestamp
;
53 uint64_t current_cycles_timestamp
;
57 struct trace_collection
*tc
;
58 GArray
*stream_saved_pos
; /* Contains struct stream_saved_pos */
61 static int stream_read_event(struct ctf_file_stream
*sin
)
65 ret
= sin
->pos
.parent
.event_cb(&sin
->pos
.parent
, &sin
->parent
);
68 else if (ret
== EAGAIN
)
69 /* Stream is inactive for now (live reading). */
72 fprintf(stderr
, "[error] Reading event failed.\n");
79 * Return true if a < b, false otherwise.
80 * If time stamps are exactly the same, compare by stream path. This
81 * ensures we get the same result between runs on the same trace
82 * collection on different environments.
83 * The result will be random for memory-mapped traces since there is no
84 * fixed path leading to those (they have empty path string).
86 static int stream_compare(void *a
, void *b
)
88 struct ctf_file_stream
*s_a
= a
, *s_b
= b
;
90 if (s_a
->parent
.real_timestamp
< s_b
->parent
.real_timestamp
) {
92 } else if (likely(s_a
->parent
.real_timestamp
> s_b
->parent
.real_timestamp
)) {
95 return strcmp(s_a
->parent
.path
, s_b
->parent
.path
);
99 void bt_iter_free_pos(struct bt_iter_pos
*iter_pos
)
104 if (iter_pos
->type
== BT_SEEK_RESTORE
&& iter_pos
->u
.restore
) {
105 if (iter_pos
->u
.restore
->stream_saved_pos
) {
107 iter_pos
->u
.restore
->stream_saved_pos
,
110 g_free(iter_pos
->u
.restore
);
116 * seek_file_stream_by_timestamp
118 * Browse a filestream by index, if an index contains the timestamp passed in
119 * argument, seek inside the corresponding packet it until we find the event we
120 * are looking for (either the exact timestamp or the event just after the
123 * Return 0 if the seek succeded, EOF if we didn't find any packet
124 * containing the timestamp, or a positive integer for error.
126 * TODO: this should be turned into a binary search! It is currently
127 * doing a linear search in the packets. This is a O(n) operation on a
128 * very frequent code path.
130 static int seek_file_stream_by_timestamp(struct ctf_file_stream
*cfs
,
133 struct ctf_stream_pos
*stream_pos
;
134 struct packet_index
*index
;
137 stream_pos
= &cfs
->pos
;
138 for (i
= 0; i
< stream_pos
->packet_index
->len
; i
++) {
139 index
= &g_array_index(stream_pos
->packet_index
,
140 struct packet_index
, i
);
141 if (index
->ts_real
.timestamp_end
< timestamp
)
144 stream_pos
->packet_seek(&stream_pos
->parent
, i
, SEEK_SET
);
146 ret
= stream_read_event(cfs
);
147 } while (cfs
->parent
.real_timestamp
< timestamp
&& ret
== 0);
149 /* Can return either EOF, 0, or error (> 0). */
153 * Cannot find the timestamp within the stream packets, return
160 * seek_ctf_trace_by_timestamp : for each file stream, seek to the event with
161 * the corresponding timestamp
163 * Return 0 on success.
164 * If the timestamp is not part of any file stream, return EOF to inform the
165 * user the timestamp is out of the scope.
166 * On other errors, return positive value.
168 static int seek_ctf_trace_by_timestamp(struct ctf_trace
*tin
,
169 uint64_t timestamp
, struct ptr_heap
*stream_heap
)
174 /* for each stream_class */
175 for (i
= 0; i
< tin
->streams
->len
; i
++) {
176 struct ctf_stream_declaration
*stream_class
;
178 stream_class
= g_ptr_array_index(tin
->streams
, i
);
181 /* for each file_stream */
182 for (j
= 0; j
< stream_class
->streams
->len
; j
++) {
183 struct ctf_stream_definition
*stream
;
184 struct ctf_file_stream
*cfs
;
186 stream
= g_ptr_array_index(stream_class
->streams
, j
);
189 cfs
= container_of(stream
, struct ctf_file_stream
,
191 ret
= seek_file_stream_by_timestamp(cfs
, timestamp
);
194 ret
= bt_heap_insert(stream_heap
, cfs
);
196 /* Return positive error. */
200 } else if (ret
> 0) {
202 * Error in seek (not EOF), failure.
206 /* on EOF just do not put stream into heap. */
210 return found
? 0 : EOF
;
214 * Find timestamp of last event in the stream.
216 * Return value: 0 if OK, positive error value on error, EOF if no
219 static int find_max_timestamp_ctf_file_stream(struct ctf_file_stream
*cfs
,
220 uint64_t *timestamp_end
)
222 int ret
, count
= 0, i
;
223 uint64_t timestamp
= 0;
224 struct ctf_stream_pos
*stream_pos
;
226 stream_pos
= &cfs
->pos
;
228 * We start by the last packet, and iterate backwards until we
229 * either find at least one event, or we reach the first packet
230 * (some packets can be empty).
232 for (i
= stream_pos
->packet_index
->len
- 1; i
>= 0; i
--) {
233 stream_pos
->packet_seek(&stream_pos
->parent
, i
, SEEK_SET
);
235 /* read each event until we reach the end of the stream */
237 ret
= stream_read_event(cfs
);
240 timestamp
= cfs
->parent
.real_timestamp
;
253 *timestamp_end
= timestamp
;
256 /* Return EOF if no events were found */
264 * Find the stream within a stream class that contains the event with
265 * the largest timestamp, and save that timestamp.
267 * Return 0 if OK, EOF if no events were found in the streams, or
268 * positive value on error.
270 static int find_max_timestamp_ctf_stream_class(
271 struct ctf_stream_declaration
*stream_class
,
272 struct ctf_file_stream
**cfsp
,
273 uint64_t *max_timestamp
)
275 int ret
= EOF
, i
, found
= 0;
277 for (i
= 0; i
< stream_class
->streams
->len
; i
++) {
278 struct ctf_stream_definition
*stream
;
279 struct ctf_file_stream
*cfs
;
280 uint64_t current_max_ts
= 0;
282 stream
= g_ptr_array_index(stream_class
->streams
, i
);
285 cfs
= container_of(stream
, struct ctf_file_stream
, parent
);
286 ret
= find_max_timestamp_ctf_file_stream(cfs
, ¤t_max_ts
);
291 if (current_max_ts
>= *max_timestamp
) {
292 *max_timestamp
= current_max_ts
;
297 assert(ret
>= 0 || ret
== EOF
);
305 * seek_last_ctf_trace_collection: seek trace collection to last event.
307 * Return 0 if OK, EOF if no events were found, or positive error value
310 static int seek_last_ctf_trace_collection(struct trace_collection
*tc
,
311 struct ctf_file_stream
**cfsp
)
315 uint64_t max_timestamp
= 0;
320 /* For each trace in the trace_collection */
321 for (i
= 0; i
< tc
->array
->len
; i
++) {
322 struct ctf_trace
*tin
;
323 struct bt_trace_descriptor
*td_read
;
325 td_read
= g_ptr_array_index(tc
->array
, i
);
328 tin
= container_of(td_read
, struct ctf_trace
, parent
);
329 /* For each stream_class in the trace */
330 for (j
= 0; j
< tin
->streams
->len
; j
++) {
331 struct ctf_stream_declaration
*stream_class
;
333 stream_class
= g_ptr_array_index(tin
->streams
, j
);
336 ret
= find_max_timestamp_ctf_stream_class(stream_class
,
337 cfsp
, &max_timestamp
);
342 assert(ret
== EOF
|| ret
== 0);
346 * Now we know in which file stream the last event is located,
347 * and we know its timestamp.
352 ret
= seek_file_stream_by_timestamp(*cfsp
, max_timestamp
);
359 int bt_iter_set_pos(struct bt_iter
*iter
, const struct bt_iter_pos
*iter_pos
)
361 struct trace_collection
*tc
;
364 if (!iter
|| !iter_pos
)
367 switch (iter_pos
->type
) {
368 case BT_SEEK_RESTORE
:
369 if (!iter_pos
->u
.restore
)
372 bt_heap_free(iter
->stream_heap
);
373 ret
= bt_heap_init(iter
->stream_heap
, 0, stream_compare
);
375 goto error_heap_init
;
377 for (i
= 0; i
< iter_pos
->u
.restore
->stream_saved_pos
->len
;
379 struct stream_saved_pos
*saved_pos
;
380 struct ctf_stream_pos
*stream_pos
;
381 struct ctf_stream_definition
*stream
;
383 saved_pos
= &g_array_index(
384 iter_pos
->u
.restore
->stream_saved_pos
,
385 struct stream_saved_pos
, i
);
386 stream
= &saved_pos
->file_stream
->parent
;
387 stream_pos
= &saved_pos
->file_stream
->pos
;
389 stream_pos
->packet_seek(&stream_pos
->parent
,
390 saved_pos
->cur_index
, SEEK_SET
);
393 * the timestamp needs to be restored after
394 * packet_seek, because this function resets
395 * the timestamp to the beginning of the packet
397 stream
->real_timestamp
= saved_pos
->current_real_timestamp
;
398 stream
->cycles_timestamp
= saved_pos
->current_cycles_timestamp
;
399 stream_pos
->offset
= saved_pos
->offset
;
400 stream_pos
->last_offset
= LAST_OFFSET_POISON
;
402 stream
->current
.real
.begin
= 0;
403 stream
->current
.real
.end
= 0;
404 stream
->current
.cycles
.begin
= 0;
405 stream
->current
.cycles
.end
= 0;
407 stream
->prev
.real
.begin
= 0;
408 stream
->prev
.real
.end
= 0;
409 stream
->prev
.cycles
.begin
= 0;
410 stream
->prev
.cycles
.end
= 0;
412 printf_debug("restored to cur_index = %" PRId64
" and "
413 "offset = %" PRId64
", timestamp = %" PRIu64
"\n",
414 stream_pos
->cur_index
,
415 stream_pos
->offset
, stream
->real_timestamp
);
417 ret
= stream_read_event(saved_pos
->file_stream
);
423 ret
= bt_heap_insert(iter
->stream_heap
,
424 saved_pos
->file_stream
);
432 bt_heap_free(iter
->stream_heap
);
433 ret
= bt_heap_init(iter
->stream_heap
, 0, stream_compare
);
435 goto error_heap_init
;
437 /* for each trace in the trace_collection */
438 for (i
= 0; i
< tc
->array
->len
; i
++) {
439 struct ctf_trace
*tin
;
440 struct bt_trace_descriptor
*td_read
;
442 td_read
= g_ptr_array_index(tc
->array
, i
);
445 tin
= container_of(td_read
, struct ctf_trace
, parent
);
447 ret
= seek_ctf_trace_by_timestamp(tin
,
448 iter_pos
->u
.seek_time
,
451 * Positive errors are failure. Negative value
452 * is EOF (for which we continue with other
453 * traces). 0 is success. Note: on EOF, it just
454 * means that no stream has been added to the
455 * iterator for that trace, which is fine.
457 if (ret
!= 0 && ret
!= EOF
)
463 bt_heap_free(iter
->stream_heap
);
464 ret
= bt_heap_init(iter
->stream_heap
, 0, stream_compare
);
466 goto error_heap_init
;
468 for (i
= 0; i
< tc
->array
->len
; i
++) {
469 struct ctf_trace
*tin
;
470 struct bt_trace_descriptor
*td_read
;
473 td_read
= g_ptr_array_index(tc
->array
, i
);
476 tin
= container_of(td_read
, struct ctf_trace
, parent
);
478 /* Populate heap with each stream */
479 for (stream_id
= 0; stream_id
< tin
->streams
->len
;
481 struct ctf_stream_declaration
*stream
;
484 stream
= g_ptr_array_index(tin
->streams
,
488 for (filenr
= 0; filenr
< stream
->streams
->len
;
490 struct ctf_file_stream
*file_stream
;
491 file_stream
= g_ptr_array_index(
496 ret
= babeltrace_filestream_seek(
497 file_stream
, iter_pos
,
499 if (ret
!= 0 && ret
!= EOF
) {
503 /* Do not add EOF streams */
506 ret
= bt_heap_insert(iter
->stream_heap
, file_stream
);
515 struct ctf_file_stream
*cfs
= NULL
;
518 ret
= seek_last_ctf_trace_collection(tc
, &cfs
);
519 if (ret
!= 0 || !cfs
)
521 /* remove all streams from the heap */
522 bt_heap_free(iter
->stream_heap
);
523 /* Create a new empty heap */
524 ret
= bt_heap_init(iter
->stream_heap
, 0, stream_compare
);
527 /* Insert the stream that contains the last event */
528 ret
= bt_heap_insert(iter
->stream_heap
, cfs
);
534 /* not implemented */
541 bt_heap_free(iter
->stream_heap
);
543 if (bt_heap_init(iter
->stream_heap
, 0, stream_compare
) < 0) {
544 bt_heap_free(iter
->stream_heap
);
545 g_free(iter
->stream_heap
);
546 iter
->stream_heap
= NULL
;
553 struct bt_iter_pos
*bt_iter_get_pos(struct bt_iter
*iter
)
555 struct bt_iter_pos
*pos
;
556 struct trace_collection
*tc
;
557 struct ctf_file_stream
*file_stream
= NULL
, *removed
;
558 struct ptr_heap iter_heap_copy
;
565 pos
= g_new0(struct bt_iter_pos
, 1);
566 pos
->type
= BT_SEEK_RESTORE
;
567 pos
->u
.restore
= g_new0(struct bt_saved_pos
, 1);
568 pos
->u
.restore
->tc
= tc
;
569 pos
->u
.restore
->stream_saved_pos
= g_array_new(FALSE
, TRUE
,
570 sizeof(struct stream_saved_pos
));
571 if (!pos
->u
.restore
->stream_saved_pos
)
574 ret
= bt_heap_copy(&iter_heap_copy
, iter
->stream_heap
);
578 /* iterate over each stream in the heap */
579 file_stream
= bt_heap_maximum(&iter_heap_copy
);
580 while (file_stream
!= NULL
) {
581 struct stream_saved_pos saved_pos
;
583 assert(file_stream
->pos
.last_offset
!= LAST_OFFSET_POISON
);
584 saved_pos
.offset
= file_stream
->pos
.last_offset
;
585 saved_pos
.file_stream
= file_stream
;
586 saved_pos
.cur_index
= file_stream
->pos
.cur_index
;
588 saved_pos
.current_real_timestamp
= file_stream
->parent
.real_timestamp
;
589 saved_pos
.current_cycles_timestamp
= file_stream
->parent
.cycles_timestamp
;
592 pos
->u
.restore
->stream_saved_pos
,
595 printf_debug("stream : %" PRIu64
", cur_index : %zd, "
597 "timestamp = %" PRIu64
"\n",
598 file_stream
->parent
.stream_id
,
599 saved_pos
.cur_index
, saved_pos
.offset
,
600 saved_pos
.current_real_timestamp
);
602 /* remove the stream from the heap copy */
603 removed
= bt_heap_remove(&iter_heap_copy
);
604 assert(removed
== file_stream
);
606 file_stream
= bt_heap_maximum(&iter_heap_copy
);
608 bt_heap_free(&iter_heap_copy
);
612 g_array_free(pos
->u
.restore
->stream_saved_pos
, TRUE
);
618 struct bt_iter_pos
*bt_iter_create_time_pos(struct bt_iter
*unused
,
621 struct bt_iter_pos
*pos
;
623 pos
= g_new0(struct bt_iter_pos
, 1);
624 pos
->type
= BT_SEEK_TIME
;
625 pos
->u
.seek_time
= timestamp
;
630 * babeltrace_filestream_seek: seek a filestream to given position.
632 * The stream_id parameter is only useful for BT_SEEK_RESTORE.
634 static int babeltrace_filestream_seek(struct ctf_file_stream
*file_stream
,
635 const struct bt_iter_pos
*begin_pos
,
636 unsigned long stream_id
)
640 if (!file_stream
|| !begin_pos
)
643 switch (begin_pos
->type
) {
646 * just insert into the heap we should already know
651 file_stream
->pos
.packet_seek(&file_stream
->pos
.parent
,
653 ret
= stream_read_event(file_stream
);
656 case BT_SEEK_RESTORE
:
658 assert(0); /* Not yet defined */
664 int bt_iter_add_trace(struct bt_iter
*iter
,
665 struct bt_trace_descriptor
*td_read
)
667 struct ctf_trace
*tin
;
668 int stream_id
, ret
= 0;
670 tin
= container_of(td_read
, struct ctf_trace
, parent
);
672 /* Populate heap with each stream */
673 for (stream_id
= 0; stream_id
< tin
->streams
->len
;
675 struct ctf_stream_declaration
*stream
;
678 stream
= g_ptr_array_index(tin
->streams
, stream_id
);
681 for (filenr
= 0; filenr
< stream
->streams
->len
;
683 struct ctf_file_stream
*file_stream
;
684 struct bt_iter_pos pos
;
686 file_stream
= g_ptr_array_index(stream
->streams
,
691 pos
.type
= BT_SEEK_BEGIN
;
692 ret
= babeltrace_filestream_seek(file_stream
,
698 } else if (ret
!= 0 && ret
!= EAGAIN
) {
702 ret
= bt_heap_insert(iter
->stream_heap
, file_stream
);
712 int bt_iter_init(struct bt_iter
*iter
,
713 struct bt_context
*ctx
,
714 const struct bt_iter_pos
*begin_pos
,
715 const struct bt_iter_pos
*end_pos
)
720 if (!iter
|| !ctx
|| !ctx
->tc
|| !ctx
->tc
->array
)
723 if (ctx
->current_iterator
) {
728 iter
->stream_heap
= g_new(struct ptr_heap
, 1);
729 iter
->end_pos
= end_pos
;
733 ret
= bt_heap_init(iter
->stream_heap
, 0, stream_compare
);
735 goto error_heap_init
;
737 for (i
= 0; i
< ctx
->tc
->array
->len
; i
++) {
738 struct bt_trace_descriptor
*td_read
;
740 td_read
= g_ptr_array_index(ctx
->tc
->array
, i
);
743 ret
= bt_iter_add_trace(iter
, td_read
);
748 ctx
->current_iterator
= iter
;
749 if (begin_pos
&& begin_pos
->type
!= BT_SEEK_BEGIN
) {
750 ret
= bt_iter_set_pos(iter
, begin_pos
);
756 bt_heap_free(iter
->stream_heap
);
758 g_free(iter
->stream_heap
);
759 iter
->stream_heap
= NULL
;
764 struct bt_iter
*bt_iter_create(struct bt_context
*ctx
,
765 const struct bt_iter_pos
*begin_pos
,
766 const struct bt_iter_pos
*end_pos
)
768 struct bt_iter
*iter
;
774 iter
= g_new0(struct bt_iter
, 1);
775 ret
= bt_iter_init(iter
, ctx
, begin_pos
, end_pos
);
783 void bt_iter_fini(struct bt_iter
*iter
)
786 if (iter
->stream_heap
) {
787 bt_heap_free(iter
->stream_heap
);
788 g_free(iter
->stream_heap
);
790 iter
->ctx
->current_iterator
= NULL
;
791 bt_context_put(iter
->ctx
);
794 void bt_iter_destroy(struct bt_iter
*iter
)
801 int bt_iter_next(struct bt_iter
*iter
)
803 struct ctf_file_stream
*file_stream
, *removed
;
809 file_stream
= bt_heap_maximum(iter
->stream_heap
);
811 /* end of file for all streams */
816 ret
= stream_read_event(file_stream
);
818 removed
= bt_heap_remove(iter
->stream_heap
);
819 assert(removed
== file_stream
);
822 } else if (ret
== EAGAIN
) {
824 * Live streaming: the stream is inactive for now, we
825 * just updated the timestamp_end to skip over this
826 * stream up to a certain point in time.
828 * Since we can't guarantee that a stream will ever have
829 * any activity, we can't rely on the fact that
830 * bt_iter_next will be called for each stream and deal
831 * with inactive streams. So instead, we return 0 here
832 * to the caller and let the read API handle the
842 /* Reinsert the file stream into the heap, and rebalance. */
843 removed
= bt_heap_replace_max(iter
->stream_heap
, file_stream
);
844 assert(removed
== file_stream
);