6 * Copyright 2010-2011 EfficiOS Inc. and Linux Foundation
8 * Author: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30 #include <babeltrace/babeltrace.h>
31 #include <babeltrace/babeltrace-internal.h>
32 #include <babeltrace/context.h>
33 #include <babeltrace/context-internal.h>
34 #include <babeltrace/iterator-internal.h>
35 #include <babeltrace/iterator.h>
36 #include <babeltrace/prio_heap.h>
37 #include <babeltrace/ctf/metadata.h>
38 #include <babeltrace/ctf/events.h>
41 static int babeltrace_filestream_seek(struct ctf_file_stream
*file_stream
,
42 const struct bt_iter_pos
*begin_pos
,
43 unsigned long stream_id
);
45 struct stream_saved_pos
{
47 * Use file_stream pointer to check if the trace collection we
48 * restore to match the one we saved from, for each stream.
50 struct ctf_file_stream
*file_stream
;
51 size_t cur_index
; /* current index in packet index */
52 ssize_t offset
; /* offset from base, in bits. EOF for end of file. */
53 uint64_t current_real_timestamp
;
54 uint64_t current_cycles_timestamp
;
58 struct trace_collection
*tc
;
59 GArray
*stream_saved_pos
; /* Contains struct stream_saved_pos */
62 static int stream_read_event(struct ctf_file_stream
*sin
)
66 ret
= sin
->pos
.parent
.event_cb(&sin
->pos
.parent
, &sin
->parent
);
69 else if (ret
== EAGAIN
)
70 /* Stream is inactive for now (live reading). */
73 fprintf(stderr
, "[error] Reading event failed.\n");
81 * Return true if a < b, false otherwise.
82 * If time stamps are exactly the same, compare by stream path. This
83 * ensures we get the same result between runs on the same trace
84 * collection on different environments.
85 * The result will be random for memory-mapped traces since there is no
86 * fixed path leading to those (they have empty path string).
88 static int stream_compare(void *a
, void *b
)
90 struct ctf_file_stream
*s_a
= a
, *s_b
= b
;
92 if (s_a
->parent
.real_timestamp
< s_b
->parent
.real_timestamp
) {
94 } else if (likely(s_a
->parent
.real_timestamp
> s_b
->parent
.real_timestamp
)) {
97 return strcmp(s_a
->parent
.path
, s_b
->parent
.path
);
101 void bt_iter_free_pos(struct bt_iter_pos
*iter_pos
)
106 if (iter_pos
->type
== BT_SEEK_RESTORE
&& iter_pos
->u
.restore
) {
107 if (iter_pos
->u
.restore
->stream_saved_pos
) {
109 iter_pos
->u
.restore
->stream_saved_pos
,
112 g_free(iter_pos
->u
.restore
);
118 * seek_file_stream_by_timestamp
120 * Browse a filestream by index, if an index contains the timestamp passed in
121 * argument, seek inside the corresponding packet it until we find the event we
122 * are looking for (either the exact timestamp or the event just after the
125 * Return 0 if the seek succeded, EOF if we didn't find any packet
126 * containing the timestamp, or a positive integer for error.
128 * TODO: this should be turned into a binary search! It is currently
129 * doing a linear search in the packets. This is a O(n) operation on a
130 * very frequent code path.
132 static int seek_file_stream_by_timestamp(struct ctf_file_stream
*cfs
,
135 struct ctf_stream_pos
*stream_pos
;
136 struct packet_index
*index
;
139 stream_pos
= &cfs
->pos
;
140 for (i
= 0; i
< stream_pos
->packet_index
->len
; i
++) {
141 index
= &g_array_index(stream_pos
->packet_index
,
142 struct packet_index
, i
);
143 if (index
->ts_real
.timestamp_end
< timestamp
)
146 stream_pos
->packet_seek(&stream_pos
->parent
, i
, SEEK_SET
);
147 ret
= bt_packet_seek_get_error();
152 ret
= stream_read_event(cfs
);
153 } while (cfs
->parent
.real_timestamp
< timestamp
&& ret
== 0);
155 /* Can return either EOF, 0, or error (> 0). */
159 * Cannot find the timestamp within the stream packets, return
166 * seek_ctf_trace_by_timestamp : for each file stream, seek to the event with
167 * the corresponding timestamp
169 * Return 0 on success.
170 * If the timestamp is not part of any file stream, return EOF to inform the
171 * user the timestamp is out of the scope.
172 * On other errors, return positive value.
174 static int seek_ctf_trace_by_timestamp(struct ctf_trace
*tin
,
175 uint64_t timestamp
, struct ptr_heap
*stream_heap
)
179 struct bt_trace_descriptor
*td
= &tin
->parent
;
181 if (td
->interval_set
) {
183 * If this trace has an interval selected, don't allow seeks
184 * before the selected interval. We seek to the start of the
185 * interval, thereby presenting a shorter "virtual" trace.
187 timestamp
= max(timestamp
, td
->interval_real
.timestamp_begin
);
190 /* for each stream_class */
191 for (i
= 0; i
< tin
->streams
->len
; i
++) {
192 struct ctf_stream_declaration
*stream_class
;
194 stream_class
= g_ptr_array_index(tin
->streams
, i
);
197 /* for each file_stream */
198 for (j
= 0; j
< stream_class
->streams
->len
; j
++) {
199 struct ctf_stream_definition
*stream
;
200 struct ctf_file_stream
*cfs
;
202 stream
= g_ptr_array_index(stream_class
->streams
, j
);
205 cfs
= container_of(stream
, struct ctf_file_stream
,
207 ret
= seek_file_stream_by_timestamp(cfs
, timestamp
);
210 ret
= bt_heap_insert(stream_heap
, cfs
);
212 /* Return positive error. */
216 } else if (ret
> 0) {
218 * Error in seek (not EOF), failure.
222 /* on EOF just do not put stream into heap. */
226 return found
? 0 : EOF
;
230 * Find timestamp of last event in the stream.
232 * Return value: 0 if OK, positive error value on error, EOF if no
235 static int find_max_timestamp_ctf_file_stream(struct ctf_file_stream
*cfs
,
236 uint64_t *timestamp_end
)
238 int ret
, count
= 0, i
;
239 uint64_t timestamp
= 0;
240 struct ctf_stream_pos
*stream_pos
;
242 stream_pos
= &cfs
->pos
;
244 * We start by the last packet, and iterate backwards until we
245 * either find at least one event, or we reach the first packet
246 * (some packets can be empty).
248 for (i
= stream_pos
->packet_index
->len
- 1; i
>= 0; i
--) {
249 stream_pos
->packet_seek(&stream_pos
->parent
, i
, SEEK_SET
);
250 ret
= bt_packet_seek_get_error();
255 /* read each event until we reach the end of the stream */
257 ret
= stream_read_event(cfs
);
260 timestamp
= cfs
->parent
.real_timestamp
;
273 *timestamp_end
= timestamp
;
276 /* Return EOF if no events were found */
284 * Find the stream within a stream class that contains the event with
285 * the largest timestamp, and save that timestamp.
287 * Return 0 if OK, EOF if no events were found in the streams, or
288 * positive value on error.
290 static int find_max_timestamp_ctf_stream_class(
291 struct ctf_stream_declaration
*stream_class
,
292 struct ctf_file_stream
**cfsp
,
293 uint64_t *max_timestamp
)
295 int ret
= EOF
, i
, found
= 0;
297 for (i
= 0; i
< stream_class
->streams
->len
; i
++) {
298 struct ctf_stream_definition
*stream
;
299 struct ctf_file_stream
*cfs
;
300 uint64_t current_max_ts
= 0;
302 stream
= g_ptr_array_index(stream_class
->streams
, i
);
305 cfs
= container_of(stream
, struct ctf_file_stream
, parent
);
306 ret
= find_max_timestamp_ctf_file_stream(cfs
, ¤t_max_ts
);
311 if (current_max_ts
>= *max_timestamp
) {
312 *max_timestamp
= current_max_ts
;
317 assert(ret
>= 0 || ret
== EOF
);
325 * seek_last_ctf_trace_collection: seek trace collection to last event.
327 * Return 0 if OK, EOF if no events were found, or positive error value
330 static int seek_last_ctf_trace_collection(struct trace_collection
*tc
,
331 struct ctf_file_stream
**cfsp
)
335 uint64_t max_timestamp
= 0;
340 /* For each trace in the trace_collection */
341 for (i
= 0; i
< tc
->array
->len
; i
++) {
342 struct ctf_trace
*tin
;
343 struct bt_trace_descriptor
*td_read
;
345 td_read
= g_ptr_array_index(tc
->array
, i
);
348 tin
= container_of(td_read
, struct ctf_trace
, parent
);
349 /* For each stream_class in the trace */
350 for (j
= 0; j
< tin
->streams
->len
; j
++) {
351 struct ctf_stream_declaration
*stream_class
;
353 stream_class
= g_ptr_array_index(tin
->streams
, j
);
356 ret
= find_max_timestamp_ctf_stream_class(stream_class
,
357 cfsp
, &max_timestamp
);
362 assert(ret
== EOF
|| ret
== 0);
366 * Now we know in which file stream the last event is located,
367 * and we know its timestamp.
372 ret
= seek_file_stream_by_timestamp(*cfsp
, max_timestamp
);
379 int bt_iter_set_pos(struct bt_iter
*iter
, const struct bt_iter_pos
*iter_pos
)
381 struct trace_collection
*tc
;
384 if (!iter
|| !iter_pos
)
387 switch (iter_pos
->type
) {
388 case BT_SEEK_RESTORE
:
389 if (!iter_pos
->u
.restore
)
392 bt_heap_free(iter
->stream_heap
);
393 ret
= bt_heap_init(iter
->stream_heap
, 0, stream_compare
);
395 goto error_heap_init
;
397 for (i
= 0; i
< iter_pos
->u
.restore
->stream_saved_pos
->len
;
399 struct stream_saved_pos
*saved_pos
;
400 struct ctf_stream_pos
*stream_pos
;
401 struct ctf_stream_definition
*stream
;
403 saved_pos
= &g_array_index(
404 iter_pos
->u
.restore
->stream_saved_pos
,
405 struct stream_saved_pos
, i
);
406 stream
= &saved_pos
->file_stream
->parent
;
407 stream_pos
= &saved_pos
->file_stream
->pos
;
409 stream_pos
->packet_seek(&stream_pos
->parent
,
410 saved_pos
->cur_index
, SEEK_SET
);
413 * the timestamp needs to be restored after
414 * packet_seek, because this function resets
415 * the timestamp to the beginning of the packet
417 stream
->real_timestamp
= saved_pos
->current_real_timestamp
;
418 stream
->cycles_timestamp
= saved_pos
->current_cycles_timestamp
;
419 stream_pos
->offset
= saved_pos
->offset
;
420 stream_pos
->last_offset
= LAST_OFFSET_POISON
;
422 stream
->current
.real
.begin
= 0;
423 stream
->current
.real
.end
= 0;
424 stream
->current
.cycles
.begin
= 0;
425 stream
->current
.cycles
.end
= 0;
427 stream
->prev
.real
.begin
= 0;
428 stream
->prev
.real
.end
= 0;
429 stream
->prev
.cycles
.begin
= 0;
430 stream
->prev
.cycles
.end
= 0;
432 printf_debug("restored to cur_index = %" PRId64
" and "
433 "offset = %" PRId64
", timestamp = %" PRIu64
"\n",
434 stream_pos
->cur_index
,
435 stream_pos
->offset
, stream
->real_timestamp
);
437 ret
= stream_read_event(saved_pos
->file_stream
);
443 ret
= bt_heap_insert(iter
->stream_heap
,
444 saved_pos
->file_stream
);
452 bt_heap_free(iter
->stream_heap
);
453 ret
= bt_heap_init(iter
->stream_heap
, 0, stream_compare
);
455 goto error_heap_init
;
457 /* for each trace in the trace_collection */
458 for (i
= 0; i
< tc
->array
->len
; i
++) {
459 struct ctf_trace
*tin
;
460 struct bt_trace_descriptor
*td_read
;
462 td_read
= g_ptr_array_index(tc
->array
, i
);
465 tin
= container_of(td_read
, struct ctf_trace
, parent
);
467 ret
= seek_ctf_trace_by_timestamp(tin
,
468 iter_pos
->u
.seek_time
,
471 * Positive errors are failure. Negative value
472 * is EOF (for which we continue with other
473 * traces). 0 is success. Note: on EOF, it just
474 * means that no stream has been added to the
475 * iterator for that trace, which is fine.
477 if (ret
!= 0 && ret
!= EOF
)
483 bt_heap_free(iter
->stream_heap
);
484 ret
= bt_heap_init(iter
->stream_heap
, 0, stream_compare
);
486 goto error_heap_init
;
488 for (i
= 0; i
< tc
->array
->len
; i
++) {
489 struct ctf_trace
*tin
;
490 struct bt_trace_descriptor
*td_read
;
493 td_read
= g_ptr_array_index(tc
->array
, i
);
496 tin
= container_of(td_read
, struct ctf_trace
, parent
);
498 /* Populate heap with each stream */
499 for (stream_id
= 0; stream_id
< tin
->streams
->len
;
501 struct ctf_stream_declaration
*stream
;
504 stream
= g_ptr_array_index(tin
->streams
,
508 for (filenr
= 0; filenr
< stream
->streams
->len
;
510 struct ctf_file_stream
*file_stream
;
511 file_stream
= g_ptr_array_index(
516 ret
= babeltrace_filestream_seek(
517 file_stream
, iter_pos
,
519 if (ret
!= 0 && ret
!= EOF
) {
523 /* Do not add EOF streams */
526 ret
= bt_heap_insert(iter
->stream_heap
, file_stream
);
535 struct ctf_file_stream
*cfs
= NULL
;
538 ret
= seek_last_ctf_trace_collection(tc
, &cfs
);
539 if (ret
!= 0 || !cfs
)
541 /* remove all streams from the heap */
542 bt_heap_free(iter
->stream_heap
);
543 /* Create a new empty heap */
544 ret
= bt_heap_init(iter
->stream_heap
, 0, stream_compare
);
547 /* Insert the stream that contains the last event */
548 ret
= bt_heap_insert(iter
->stream_heap
, cfs
);
554 /* not implemented */
561 bt_heap_free(iter
->stream_heap
);
563 if (bt_heap_init(iter
->stream_heap
, 0, stream_compare
) < 0) {
564 bt_heap_free(iter
->stream_heap
);
565 g_free(iter
->stream_heap
);
566 iter
->stream_heap
= NULL
;
573 struct bt_iter_pos
*bt_iter_get_pos(struct bt_iter
*iter
)
575 struct bt_iter_pos
*pos
;
576 struct trace_collection
*tc
;
577 struct ctf_file_stream
*file_stream
= NULL
, *removed
;
578 struct ptr_heap iter_heap_copy
;
585 pos
= g_new0(struct bt_iter_pos
, 1);
586 pos
->type
= BT_SEEK_RESTORE
;
587 pos
->u
.restore
= g_new0(struct bt_saved_pos
, 1);
588 pos
->u
.restore
->tc
= tc
;
589 pos
->u
.restore
->stream_saved_pos
= g_array_new(FALSE
, TRUE
,
590 sizeof(struct stream_saved_pos
));
591 if (!pos
->u
.restore
->stream_saved_pos
)
594 ret
= bt_heap_copy(&iter_heap_copy
, iter
->stream_heap
);
598 /* iterate over each stream in the heap */
599 file_stream
= bt_heap_maximum(&iter_heap_copy
);
600 while (file_stream
!= NULL
) {
601 struct stream_saved_pos saved_pos
;
603 assert(file_stream
->pos
.last_offset
!= LAST_OFFSET_POISON
);
604 saved_pos
.offset
= file_stream
->pos
.last_offset
;
605 saved_pos
.file_stream
= file_stream
;
606 saved_pos
.cur_index
= file_stream
->pos
.cur_index
;
608 saved_pos
.current_real_timestamp
= file_stream
->parent
.real_timestamp
;
609 saved_pos
.current_cycles_timestamp
= file_stream
->parent
.cycles_timestamp
;
612 pos
->u
.restore
->stream_saved_pos
,
615 printf_debug("stream : %" PRIu64
", cur_index : %zd, "
617 "timestamp = %" PRIu64
"\n",
618 file_stream
->parent
.stream_id
,
619 saved_pos
.cur_index
, saved_pos
.offset
,
620 saved_pos
.current_real_timestamp
);
622 /* remove the stream from the heap copy */
623 removed
= bt_heap_remove(&iter_heap_copy
);
624 assert(removed
== file_stream
);
626 file_stream
= bt_heap_maximum(&iter_heap_copy
);
628 bt_heap_free(&iter_heap_copy
);
632 g_array_free(pos
->u
.restore
->stream_saved_pos
, TRUE
);
638 struct bt_iter_pos
*bt_iter_create_time_pos(struct bt_iter
*unused
,
641 struct bt_iter_pos
*pos
;
643 pos
= g_new0(struct bt_iter_pos
, 1);
644 pos
->type
= BT_SEEK_TIME
;
645 pos
->u
.seek_time
= timestamp
;
650 * babeltrace_filestream_seek: seek a filestream to given position.
652 * The stream_id parameter is only useful for BT_SEEK_RESTORE.
654 static int babeltrace_filestream_seek(struct ctf_file_stream
*file_stream
,
655 const struct bt_iter_pos
*begin_pos
,
656 unsigned long stream_id
)
660 if (!file_stream
|| !begin_pos
)
663 switch (begin_pos
->type
) {
666 * just insert into the heap we should already know
671 file_stream
->pos
.packet_seek(&file_stream
->pos
.parent
,
673 ret
= stream_read_event(file_stream
);
676 case BT_SEEK_RESTORE
:
678 assert(0); /* Not yet defined */
684 int bt_iter_add_trace(struct bt_iter
*iter
,
685 struct bt_trace_descriptor
*td_read
)
687 struct ctf_trace
*tin
;
688 int stream_id
, ret
= 0;
690 tin
= container_of(td_read
, struct ctf_trace
, parent
);
692 /* Populate heap with each stream */
693 for (stream_id
= 0; stream_id
< tin
->streams
->len
;
695 struct ctf_stream_declaration
*stream
;
698 stream
= g_ptr_array_index(tin
->streams
, stream_id
);
701 for (filenr
= 0; filenr
< stream
->streams
->len
;
703 struct ctf_file_stream
*file_stream
;
704 struct bt_iter_pos pos
;
706 file_stream
= g_ptr_array_index(stream
->streams
,
711 pos
.type
= BT_SEEK_BEGIN
;
712 ret
= babeltrace_filestream_seek(file_stream
,
718 } else if (ret
!= 0 && ret
!= EAGAIN
) {
722 ret
= bt_heap_insert(iter
->stream_heap
, file_stream
);
732 int bt_iter_init(struct bt_iter
*iter
,
733 struct bt_context
*ctx
,
734 const struct bt_iter_pos
*begin_pos
,
735 const struct bt_iter_pos
*end_pos
)
740 if (!iter
|| !ctx
|| !ctx
->tc
|| !ctx
->tc
->array
)
743 if (ctx
->current_iterator
) {
748 iter
->stream_heap
= g_new(struct ptr_heap
, 1);
749 iter
->end_pos
= end_pos
;
753 ret
= bt_heap_init(iter
->stream_heap
, 0, stream_compare
);
755 goto error_heap_init
;
757 for (i
= 0; i
< ctx
->tc
->array
->len
; i
++) {
758 struct bt_trace_descriptor
*td_read
;
760 td_read
= g_ptr_array_index(ctx
->tc
->array
, i
);
763 ret
= bt_iter_add_trace(iter
, td_read
);
768 ctx
->current_iterator
= iter
;
769 if (begin_pos
&& begin_pos
->type
!= BT_SEEK_BEGIN
) {
770 ret
= bt_iter_set_pos(iter
, begin_pos
);
779 bt_heap_free(iter
->stream_heap
);
783 g_free(iter
->stream_heap
);
784 iter
->stream_heap
= NULL
;
789 struct bt_iter
*bt_iter_create(struct bt_context
*ctx
,
790 const struct bt_iter_pos
*begin_pos
,
791 const struct bt_iter_pos
*end_pos
)
793 struct bt_iter
*iter
;
799 iter
= g_new0(struct bt_iter
, 1);
800 ret
= bt_iter_init(iter
, ctx
, begin_pos
, end_pos
);
808 void bt_iter_fini(struct bt_iter
*iter
)
811 if (iter
->stream_heap
) {
812 bt_heap_free(iter
->stream_heap
);
813 g_free(iter
->stream_heap
);
815 iter
->ctx
->current_iterator
= NULL
;
816 bt_context_put(iter
->ctx
);
820 void bt_iter_destroy(struct bt_iter
*iter
)
827 int bt_iter_next(struct bt_iter
*iter
)
829 struct ctf_file_stream
*file_stream
, *removed
;
831 bool event_outside_interval
= false;
836 file_stream
= bt_heap_maximum(iter
->stream_heap
);
838 /* end of file for all streams */
843 ret
= stream_read_event(file_stream
);
844 if (file_stream
->pos
.parent
.trace
&&
845 file_stream
->pos
.parent
.trace
->interval_set
) {
846 event_outside_interval
=
847 file_stream
->parent
.real_timestamp
>
848 file_stream
->pos
.parent
.trace
->interval_real
.timestamp_end
;
850 if (ret
== EOF
|| event_outside_interval
) {
851 removed
= bt_heap_remove(iter
->stream_heap
);
852 assert(removed
== file_stream
);
855 } else if (ret
== EAGAIN
) {
857 * Live streaming: the stream is inactive for now, we
858 * just updated the timestamp_end to skip over this
859 * stream up to a certain point in time.
861 * Since we can't guarantee that a stream will ever have
862 * any activity, we can't rely on the fact that
863 * bt_iter_next will be called for each stream and deal
864 * with inactive streams. So instead, we return 0 here
865 * to the caller and let the read API handle the
870 } else if (ret
== -ERANGE
) {
871 removed
= bt_heap_remove(iter
->stream_heap
);
872 assert(removed
== file_stream
);
879 /* Reinsert the file stream into the heap, and rebalance. */
880 removed
= bt_heap_replace_max(iter
->stream_heap
, file_stream
);
881 assert(removed
== file_stream
);