6 * Copyright 2010-2011 EfficiOS Inc. and Linux Foundation
8 * Author: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30 #include <babeltrace/babeltrace.h>
31 #include <babeltrace/context.h>
32 #include <babeltrace/context-internal.h>
33 #include <babeltrace/iterator-internal.h>
34 #include <babeltrace/iterator.h>
35 #include <babeltrace/prio_heap.h>
36 #include <babeltrace/ctf/metadata.h>
37 #include <babeltrace/ctf/events.h>
40 static int babeltrace_filestream_seek(struct ctf_file_stream
*file_stream
,
41 const struct bt_iter_pos
*begin_pos
,
42 unsigned long stream_id
);
44 struct stream_saved_pos
{
46 * Use file_stream pointer to check if the trace collection we
47 * restore to match the one we saved from, for each stream.
49 struct ctf_file_stream
*file_stream
;
50 size_t cur_index
; /* current index in packet index */
51 ssize_t offset
; /* offset from base, in bits. EOF for end of file. */
52 uint64_t current_real_timestamp
;
53 uint64_t current_cycles_timestamp
;
57 struct trace_collection
*tc
;
58 GArray
*stream_saved_pos
; /* Contains struct stream_saved_pos */
61 static int stream_read_event(struct ctf_file_stream
*sin
)
65 ret
= sin
->pos
.parent
.event_cb(&sin
->pos
.parent
, &sin
->parent
);
68 else if (ret
== EAGAIN
)
69 /* Stream is inactive for now (live reading). */
72 fprintf(stderr
, "[error] Reading event failed.\n");
79 * Return true if a < b, false otherwise.
80 * If time stamps are exactly the same, compare by stream path. This
81 * ensures we get the same result between runs on the same trace
82 * collection on different environments.
83 * The result will be random for memory-mapped traces since there is no
84 * fixed path leading to those (they have empty path string).
86 static int stream_compare(void *a
, void *b
)
88 struct ctf_file_stream
*s_a
= a
, *s_b
= b
;
90 if (s_a
->parent
.real_timestamp
< s_b
->parent
.real_timestamp
) {
92 } else if (likely(s_a
->parent
.real_timestamp
> s_b
->parent
.real_timestamp
)) {
95 return strcmp(s_a
->parent
.path
, s_b
->parent
.path
);
99 void bt_iter_free_pos(struct bt_iter_pos
*iter_pos
)
104 if (iter_pos
->type
== BT_SEEK_RESTORE
&& iter_pos
->u
.restore
) {
105 if (iter_pos
->u
.restore
->stream_saved_pos
) {
107 iter_pos
->u
.restore
->stream_saved_pos
,
110 g_free(iter_pos
->u
.restore
);
116 * seek_file_stream_by_timestamp
118 * Browse a filestream by index, if an index contains the timestamp passed in
119 * argument, seek inside the corresponding packet it until we find the event we
120 * are looking for (either the exact timestamp or the event just after the
123 * Return 0 if the seek succeded, EOF if we didn't find any packet
124 * containing the timestamp, or a positive integer for error.
126 * TODO: this should be turned into a binary search! It is currently
127 * doing a linear search in the packets. This is a O(n) operation on a
128 * very frequent code path.
130 static int seek_file_stream_by_timestamp(struct ctf_file_stream
*cfs
,
133 struct ctf_stream_pos
*stream_pos
;
134 struct packet_index
*index
;
137 stream_pos
= &cfs
->pos
;
138 for (i
= 0; i
< stream_pos
->packet_index
->len
; i
++) {
139 index
= &g_array_index(stream_pos
->packet_index
,
140 struct packet_index
, i
);
141 if (index
->ts_real
.timestamp_end
< timestamp
)
144 stream_pos
->packet_seek(&stream_pos
->parent
, i
, SEEK_SET
);
146 ret
= stream_read_event(cfs
);
147 } while (cfs
->parent
.real_timestamp
< timestamp
&& ret
== 0);
149 /* Can return either EOF, 0, or error (> 0). */
153 * Cannot find the timestamp within the stream packets, return
160 * seek_ctf_trace_by_timestamp : for each file stream, seek to the event with
161 * the corresponding timestamp
163 * Return 0 on success.
164 * If the timestamp is not part of any file stream, return EOF to inform the
165 * user the timestamp is out of the scope.
166 * On other errors, return positive value.
168 static int seek_ctf_trace_by_timestamp(struct ctf_trace
*tin
,
169 uint64_t timestamp
, struct ptr_heap
*stream_heap
)
174 /* for each stream_class */
175 for (i
= 0; i
< tin
->streams
->len
; i
++) {
176 struct ctf_stream_declaration
*stream_class
;
178 stream_class
= g_ptr_array_index(tin
->streams
, i
);
181 /* for each file_stream */
182 for (j
= 0; j
< stream_class
->streams
->len
; j
++) {
183 struct ctf_stream_definition
*stream
;
184 struct ctf_file_stream
*cfs
;
186 stream
= g_ptr_array_index(stream_class
->streams
, j
);
189 cfs
= container_of(stream
, struct ctf_file_stream
,
191 ret
= seek_file_stream_by_timestamp(cfs
, timestamp
);
194 ret
= bt_heap_insert(stream_heap
, cfs
);
196 /* Return positive error. */
200 } else if (ret
> 0) {
202 * Error in seek (not EOF), failure.
206 /* on EOF just do not put stream into heap. */
210 return found
? 0 : EOF
;
214 * Find timestamp of last event in the stream.
216 * Return value: 0 if OK, positive error value on error, EOF if no
219 static int find_max_timestamp_ctf_file_stream(struct ctf_file_stream
*cfs
,
220 uint64_t *timestamp_end
)
222 int ret
, count
= 0, i
;
223 uint64_t timestamp
= 0;
224 struct ctf_stream_pos
*stream_pos
;
226 stream_pos
= &cfs
->pos
;
228 * We start by the last packet, and iterate backwards until we
229 * either find at least one event, or we reach the first packet
230 * (some packets can be empty).
232 for (i
= stream_pos
->packet_index
->len
- 1; i
>= 0; i
--) {
233 stream_pos
->packet_seek(&stream_pos
->parent
, i
, SEEK_SET
);
235 /* read each event until we reach the end of the stream */
237 ret
= stream_read_event(cfs
);
240 timestamp
= cfs
->parent
.real_timestamp
;
253 *timestamp_end
= timestamp
;
256 /* Return EOF if no events were found */
264 * Find the stream within a stream class that contains the event with
265 * the largest timestamp, and save that timestamp.
267 * Return 0 if OK, EOF if no events were found in the streams, or
268 * positive value on error.
270 static int find_max_timestamp_ctf_stream_class(
271 struct ctf_stream_declaration
*stream_class
,
272 struct ctf_file_stream
**cfsp
,
273 uint64_t *max_timestamp
)
275 int ret
= EOF
, i
, found
= 0;
277 for (i
= 0; i
< stream_class
->streams
->len
; i
++) {
278 struct ctf_stream_definition
*stream
;
279 struct ctf_file_stream
*cfs
;
280 uint64_t current_max_ts
= 0;
282 stream
= g_ptr_array_index(stream_class
->streams
, i
);
285 cfs
= container_of(stream
, struct ctf_file_stream
, parent
);
286 ret
= find_max_timestamp_ctf_file_stream(cfs
, ¤t_max_ts
);
291 if (current_max_ts
>= *max_timestamp
) {
292 *max_timestamp
= current_max_ts
;
297 assert(ret
>= 0 || ret
== EOF
);
305 * seek_last_ctf_trace_collection: seek trace collection to last event.
307 * Return 0 if OK, EOF if no events were found, or positive error value
310 static int seek_last_ctf_trace_collection(struct trace_collection
*tc
,
311 struct ctf_file_stream
**cfsp
)
315 uint64_t max_timestamp
= 0;
320 /* For each trace in the trace_collection */
321 for (i
= 0; i
< tc
->array
->len
; i
++) {
322 struct ctf_trace
*tin
;
323 struct bt_trace_descriptor
*td_read
;
325 td_read
= g_ptr_array_index(tc
->array
, i
);
328 tin
= container_of(td_read
, struct ctf_trace
, parent
);
329 /* For each stream_class in the trace */
330 for (j
= 0; j
< tin
->streams
->len
; j
++) {
331 struct ctf_stream_declaration
*stream_class
;
333 stream_class
= g_ptr_array_index(tin
->streams
, j
);
336 ret
= find_max_timestamp_ctf_stream_class(stream_class
,
337 cfsp
, &max_timestamp
);
342 assert(ret
== EOF
|| ret
== 0);
346 * Now we know in which file stream the last event is located,
347 * and we know its timestamp.
352 ret
= seek_file_stream_by_timestamp(*cfsp
, max_timestamp
);
359 int bt_iter_set_pos(struct bt_iter
*iter
, const struct bt_iter_pos
*iter_pos
)
361 struct trace_collection
*tc
;
364 if (!iter
|| !iter_pos
)
367 switch (iter_pos
->type
) {
368 case BT_SEEK_RESTORE
:
369 if (!iter_pos
->u
.restore
)
372 bt_heap_free(iter
->stream_heap
);
373 ret
= bt_heap_init(iter
->stream_heap
, 0, stream_compare
);
375 goto error_heap_init
;
377 for (i
= 0; i
< iter_pos
->u
.restore
->stream_saved_pos
->len
;
379 struct stream_saved_pos
*saved_pos
;
380 struct ctf_stream_pos
*stream_pos
;
381 struct ctf_stream_definition
*stream
;
383 saved_pos
= &g_array_index(
384 iter_pos
->u
.restore
->stream_saved_pos
,
385 struct stream_saved_pos
, i
);
386 stream
= &saved_pos
->file_stream
->parent
;
387 stream_pos
= &saved_pos
->file_stream
->pos
;
389 stream_pos
->packet_seek(&stream_pos
->parent
,
390 saved_pos
->cur_index
, SEEK_SET
);
393 * the timestamp needs to be restored after
394 * packet_seek, because this function resets
395 * the timestamp to the beginning of the packet
397 stream
->real_timestamp
= saved_pos
->current_real_timestamp
;
398 stream
->cycles_timestamp
= saved_pos
->current_cycles_timestamp
;
399 stream_pos
->offset
= saved_pos
->offset
;
400 stream_pos
->last_offset
= LAST_OFFSET_POISON
;
402 stream
->prev_real_timestamp
= 0;
403 stream
->prev_real_timestamp_end
= 0;
404 stream
->prev_cycles_timestamp
= 0;
405 stream
->prev_cycles_timestamp_end
= 0;
407 printf_debug("restored to cur_index = %" PRId64
" and "
408 "offset = %" PRId64
", timestamp = %" PRIu64
"\n",
409 stream_pos
->cur_index
,
410 stream_pos
->offset
, stream
->real_timestamp
);
412 ret
= stream_read_event(saved_pos
->file_stream
);
418 ret
= bt_heap_insert(iter
->stream_heap
,
419 saved_pos
->file_stream
);
427 bt_heap_free(iter
->stream_heap
);
428 ret
= bt_heap_init(iter
->stream_heap
, 0, stream_compare
);
430 goto error_heap_init
;
432 /* for each trace in the trace_collection */
433 for (i
= 0; i
< tc
->array
->len
; i
++) {
434 struct ctf_trace
*tin
;
435 struct bt_trace_descriptor
*td_read
;
437 td_read
= g_ptr_array_index(tc
->array
, i
);
440 tin
= container_of(td_read
, struct ctf_trace
, parent
);
442 ret
= seek_ctf_trace_by_timestamp(tin
,
443 iter_pos
->u
.seek_time
,
446 * Positive errors are failure. Negative value
447 * is EOF (for which we continue with other
448 * traces). 0 is success. Note: on EOF, it just
449 * means that no stream has been added to the
450 * iterator for that trace, which is fine.
452 if (ret
!= 0 && ret
!= EOF
)
458 bt_heap_free(iter
->stream_heap
);
459 ret
= bt_heap_init(iter
->stream_heap
, 0, stream_compare
);
461 goto error_heap_init
;
463 for (i
= 0; i
< tc
->array
->len
; i
++) {
464 struct ctf_trace
*tin
;
465 struct bt_trace_descriptor
*td_read
;
468 td_read
= g_ptr_array_index(tc
->array
, i
);
471 tin
= container_of(td_read
, struct ctf_trace
, parent
);
473 /* Populate heap with each stream */
474 for (stream_id
= 0; stream_id
< tin
->streams
->len
;
476 struct ctf_stream_declaration
*stream
;
479 stream
= g_ptr_array_index(tin
->streams
,
483 for (filenr
= 0; filenr
< stream
->streams
->len
;
485 struct ctf_file_stream
*file_stream
;
486 file_stream
= g_ptr_array_index(
491 ret
= babeltrace_filestream_seek(
492 file_stream
, iter_pos
,
494 if (ret
!= 0 && ret
!= EOF
) {
498 /* Do not add EOF streams */
501 ret
= bt_heap_insert(iter
->stream_heap
, file_stream
);
510 struct ctf_file_stream
*cfs
= NULL
;
513 ret
= seek_last_ctf_trace_collection(tc
, &cfs
);
514 if (ret
!= 0 || !cfs
)
516 /* remove all streams from the heap */
517 bt_heap_free(iter
->stream_heap
);
518 /* Create a new empty heap */
519 ret
= bt_heap_init(iter
->stream_heap
, 0, stream_compare
);
522 /* Insert the stream that contains the last event */
523 ret
= bt_heap_insert(iter
->stream_heap
, cfs
);
529 /* not implemented */
536 bt_heap_free(iter
->stream_heap
);
538 if (bt_heap_init(iter
->stream_heap
, 0, stream_compare
) < 0) {
539 bt_heap_free(iter
->stream_heap
);
540 g_free(iter
->stream_heap
);
541 iter
->stream_heap
= NULL
;
548 struct bt_iter_pos
*bt_iter_get_pos(struct bt_iter
*iter
)
550 struct bt_iter_pos
*pos
;
551 struct trace_collection
*tc
;
552 struct ctf_file_stream
*file_stream
= NULL
, *removed
;
553 struct ptr_heap iter_heap_copy
;
560 pos
= g_new0(struct bt_iter_pos
, 1);
561 pos
->type
= BT_SEEK_RESTORE
;
562 pos
->u
.restore
= g_new0(struct bt_saved_pos
, 1);
563 pos
->u
.restore
->tc
= tc
;
564 pos
->u
.restore
->stream_saved_pos
= g_array_new(FALSE
, TRUE
,
565 sizeof(struct stream_saved_pos
));
566 if (!pos
->u
.restore
->stream_saved_pos
)
569 ret
= bt_heap_copy(&iter_heap_copy
, iter
->stream_heap
);
573 /* iterate over each stream in the heap */
574 file_stream
= bt_heap_maximum(&iter_heap_copy
);
575 while (file_stream
!= NULL
) {
576 struct stream_saved_pos saved_pos
;
578 assert(file_stream
->pos
.last_offset
!= LAST_OFFSET_POISON
);
579 saved_pos
.offset
= file_stream
->pos
.last_offset
;
580 saved_pos
.file_stream
= file_stream
;
581 saved_pos
.cur_index
= file_stream
->pos
.cur_index
;
583 saved_pos
.current_real_timestamp
= file_stream
->parent
.real_timestamp
;
584 saved_pos
.current_cycles_timestamp
= file_stream
->parent
.cycles_timestamp
;
587 pos
->u
.restore
->stream_saved_pos
,
590 printf_debug("stream : %" PRIu64
", cur_index : %zd, "
592 "timestamp = %" PRIu64
"\n",
593 file_stream
->parent
.stream_id
,
594 saved_pos
.cur_index
, saved_pos
.offset
,
595 saved_pos
.current_real_timestamp
);
597 /* remove the stream from the heap copy */
598 removed
= bt_heap_remove(&iter_heap_copy
);
599 assert(removed
== file_stream
);
601 file_stream
= bt_heap_maximum(&iter_heap_copy
);
603 bt_heap_free(&iter_heap_copy
);
607 g_array_free(pos
->u
.restore
->stream_saved_pos
, TRUE
);
613 struct bt_iter_pos
*bt_iter_create_time_pos(struct bt_iter
*iter
,
616 struct bt_iter_pos
*pos
;
621 pos
= g_new0(struct bt_iter_pos
, 1);
622 pos
->type
= BT_SEEK_TIME
;
623 pos
->u
.seek_time
= timestamp
;
628 * babeltrace_filestream_seek: seek a filestream to given position.
630 * The stream_id parameter is only useful for BT_SEEK_RESTORE.
632 static int babeltrace_filestream_seek(struct ctf_file_stream
*file_stream
,
633 const struct bt_iter_pos
*begin_pos
,
634 unsigned long stream_id
)
638 if (!file_stream
|| !begin_pos
)
641 switch (begin_pos
->type
) {
644 * just insert into the heap we should already know
649 file_stream
->pos
.packet_seek(&file_stream
->pos
.parent
,
651 ret
= stream_read_event(file_stream
);
654 case BT_SEEK_RESTORE
:
656 assert(0); /* Not yet defined */
662 int bt_iter_init(struct bt_iter
*iter
,
663 struct bt_context
*ctx
,
664 const struct bt_iter_pos
*begin_pos
,
665 const struct bt_iter_pos
*end_pos
)
673 if (ctx
->current_iterator
) {
678 iter
->stream_heap
= g_new(struct ptr_heap
, 1);
679 iter
->end_pos
= end_pos
;
683 ret
= bt_heap_init(iter
->stream_heap
, 0, stream_compare
);
685 goto error_heap_init
;
687 for (i
= 0; i
< ctx
->tc
->array
->len
; i
++) {
688 struct ctf_trace
*tin
;
689 struct bt_trace_descriptor
*td_read
;
691 td_read
= g_ptr_array_index(ctx
->tc
->array
, i
);
694 tin
= container_of(td_read
, struct ctf_trace
, parent
);
696 /* Populate heap with each stream */
697 for (stream_id
= 0; stream_id
< tin
->streams
->len
;
699 struct ctf_stream_declaration
*stream
;
702 stream
= g_ptr_array_index(tin
->streams
, stream_id
);
705 for (filenr
= 0; filenr
< stream
->streams
->len
;
707 struct ctf_file_stream
*file_stream
;
708 struct bt_iter_pos pos
;
710 file_stream
= g_ptr_array_index(stream
->streams
,
715 pos
.type
= BT_SEEK_BEGIN
;
716 ret
= babeltrace_filestream_seek(file_stream
,
722 } else if (ret
!= 0 && ret
!= EAGAIN
) {
726 ret
= bt_heap_insert(iter
->stream_heap
, file_stream
);
733 ctx
->current_iterator
= iter
;
734 if (begin_pos
&& begin_pos
->type
!= BT_SEEK_BEGIN
) {
735 ret
= bt_iter_set_pos(iter
, begin_pos
);
741 bt_heap_free(iter
->stream_heap
);
743 g_free(iter
->stream_heap
);
744 iter
->stream_heap
= NULL
;
749 struct bt_iter
*bt_iter_create(struct bt_context
*ctx
,
750 const struct bt_iter_pos
*begin_pos
,
751 const struct bt_iter_pos
*end_pos
)
753 struct bt_iter
*iter
;
759 iter
= g_new0(struct bt_iter
, 1);
760 ret
= bt_iter_init(iter
, ctx
, begin_pos
, end_pos
);
768 void bt_iter_fini(struct bt_iter
*iter
)
771 if (iter
->stream_heap
) {
772 bt_heap_free(iter
->stream_heap
);
773 g_free(iter
->stream_heap
);
775 iter
->ctx
->current_iterator
= NULL
;
776 bt_context_put(iter
->ctx
);
779 void bt_iter_destroy(struct bt_iter
*iter
)
786 int bt_iter_next(struct bt_iter
*iter
)
788 struct ctf_file_stream
*file_stream
, *removed
;
794 file_stream
= bt_heap_maximum(iter
->stream_heap
);
796 /* end of file for all streams */
801 ret
= stream_read_event(file_stream
);
803 removed
= bt_heap_remove(iter
->stream_heap
);
804 assert(removed
== file_stream
);
807 } else if (ret
== EAGAIN
) {
809 * Live streaming: the stream is inactive for now, we
810 * just updated the timestamp_end to skip over this
811 * stream up to a certain point in time.
813 * Since we can't guarantee that a stream will ever have
814 * any activity, we can't rely on the fact that
815 * bt_iter_next will be called for each stream and deal
816 * with inactive streams. So instead, we return 0 here
817 * to the caller and let the read API handle the
827 /* Reinsert the file stream into the heap, and rebalance. */
828 removed
= bt_heap_replace_max(iter
->stream_heap
, file_stream
);
829 assert(removed
== file_stream
);