X-Git-Url: http://git.efficios.com/?p=babeltrace.git;a=blobdiff_plain;f=lib%2Fiterator.c;h=0ae2a1ba1b265451f50f1c9e0c44d2f314248767;hp=4d201dc48942d911edc5647db48837869fcb526a;hb=90fcbacc6eb672f0d5e0c9d6722d397ac9f1a63e;hpb=d40a9bb4c13a7ab2be5482d9632346f426229149 diff --git a/lib/iterator.c b/lib/iterator.c index 4d201dc4..0ae2a1ba 100644 --- a/lib/iterator.c +++ b/lib/iterator.c @@ -26,6 +26,7 @@ #include #include #include +#include struct stream_saved_pos { /* @@ -35,6 +36,7 @@ struct stream_saved_pos { struct ctf_file_stream *file_stream; size_t cur_index; /* current index in packet index */ ssize_t offset; /* offset from base, in bits. EOF for end of file. */ + uint64_t current_timestamp; }; struct bt_saved_pos { @@ -69,6 +71,184 @@ int stream_compare(void *a, void *b) return 0; } +void bt_iter_free_pos(struct bt_iter_pos *iter_pos) +{ + if (iter_pos) { + if (iter_pos->u.restore) { + if (iter_pos->u.restore->stream_saved_pos) { + g_array_free( + iter_pos->u.restore->stream_saved_pos, + TRUE); + } + g_free(iter_pos->u.restore); + } + g_free(iter_pos); + } +} + +int bt_iter_set_pos(struct bt_iter *iter, const struct bt_iter_pos *iter_pos) +{ + int i, ret; + + if (iter_pos->u.restore) { + /* clear and recreate the heap */ + heap_free(iter->stream_heap); + g_free(iter->stream_heap); + iter->stream_heap = g_new(struct ptr_heap, 1); + ret = heap_init(iter->stream_heap, 0, stream_compare); + if (ret < 0) + goto error_heap_init; + + for (i = 0; i < iter_pos->u.restore->stream_saved_pos->len; + i++) { + struct stream_saved_pos *saved_pos; + struct ctf_stream_pos *stream_pos; + struct packet_index *index; + struct ctf_stream *stream; + + saved_pos = &g_array_index( + iter_pos->u.restore->stream_saved_pos, + struct stream_saved_pos, i); + stream = &saved_pos->file_stream->parent; + stream_pos = &saved_pos->file_stream->pos; + index = &g_array_index(stream_pos->packet_index, + struct packet_index, + saved_pos->cur_index); + + stream_pos->cur_index = saved_pos->cur_index; + stream_pos->move_pos_slow(stream_pos, index->offset, + SEEK_SET); + + /* + * the timestamp needs to be restored after + * move_pos_slow, because this function resets + * the timestamp to the beginning of the packet + */ + stream->timestamp = saved_pos->current_timestamp; + stream_pos->offset = saved_pos->offset; + stream_pos->last_offset = saved_pos->offset; + + stream->prev_timestamp = 0; + stream->prev_timestamp_end = 0; + stream->consumed = 0; + + printf_debug("restored to cur_index = %zd and " + "offset = %zd, timestamp = %" PRIu64 "\n", + stream_pos->cur_index, + stream_pos->offset, stream->timestamp); + + stream_read_event(saved_pos->file_stream); + + /* Add to heap */ + ret = heap_insert(iter->stream_heap, + saved_pos->file_stream); + if (ret) + goto error; + } + } else if (iter_pos->u.seek_time) { + /* not yet implemented */ + return -1; + } else { + /* nowhere to seek to */ + return -1; + } + + return 0; +error: + heap_free(iter->stream_heap); +error_heap_init: + g_free(iter->stream_heap); + return -1; +} + +struct bt_iter_pos *bt_iter_get_pos(struct bt_iter *iter) +{ + struct bt_iter_pos *pos; + struct trace_collection *tc = iter->ctx->tc; + int i, stream_class_id, stream_id; + + pos = g_new0(struct bt_iter_pos, 1); + if (!pos) { + perror("allocating bt_iter_pos"); + goto error; + } + + pos->u.restore = g_new0(struct bt_saved_pos, 1); + if (!pos->u.restore) { + perror("allocating bt_saved_pos"); + goto error; + } + + pos->u.restore->tc = tc; + pos->u.restore->stream_saved_pos = g_array_new(FALSE, TRUE, + sizeof(struct stream_saved_pos)); + if (!pos->u.restore->stream_saved_pos) + goto error; + + for (i = 0; i < tc->array->len; i++) { + struct ctf_trace *tin; + struct trace_descriptor *td_read; + + td_read = g_ptr_array_index(tc->array, i); + tin = container_of(td_read, struct ctf_trace, parent); + + for (stream_class_id = 0; stream_class_id < tin->streams->len; + stream_class_id++) { + struct ctf_stream_class *stream_class; + + stream_class = g_ptr_array_index(tin->streams, + stream_class_id); + for (stream_id = 0; + stream_id < stream_class->streams->len; + stream_id++) { + struct ctf_stream *stream; + struct ctf_file_stream *cfs; + struct stream_saved_pos saved_pos; + + stream = g_ptr_array_index( + stream_class->streams, + stream_id); + cfs = container_of(stream, + struct ctf_file_stream, + parent); + + saved_pos.file_stream = cfs; + saved_pos.cur_index = cfs->pos.cur_index; + + /* + * It is possible that an event was read during + * the last restore, never consumed and its + * position saved again. For this case, we + * need to check if the event really was + * consumed by the caller otherwise it is lost. + */ + if (stream->consumed) + saved_pos.offset = cfs->pos.offset; + else + saved_pos.offset = cfs->pos.last_offset; + + saved_pos.current_timestamp = stream->timestamp; + + g_array_append_val( + pos->u.restore->stream_saved_pos, + saved_pos); + + printf_debug("stream : %lu, cur_index : %zd, " + "offset : %zd, " + "timestamp = %" PRIu64 "\n", + stream->stream_id, saved_pos.cur_index, + saved_pos.offset, + saved_pos.current_timestamp); + } + } + } + + return pos; + +error: + return NULL; +} + /* * babeltrace_filestream_seek: seek a filestream to given position. * @@ -150,7 +330,7 @@ struct bt_iter *bt_iter_create(struct bt_context *ctx, int ret = 0; struct bt_iter *iter; - iter = malloc(sizeof(struct bt_iter)); + iter = g_new0(struct bt_iter, 1); if (!iter) goto error_malloc; iter->stream_heap = g_new(struct ptr_heap, 1); @@ -215,7 +395,7 @@ error: error_heap_init: g_free(iter->stream_heap); error_ctx: - free(iter); + g_free(iter); error_malloc: return NULL; } @@ -251,7 +431,7 @@ void bt_iter_destroy(struct bt_iter *iter) bt_context_put(iter->ctx); - free(iter); + g_free(iter); } int bt_iter_next(struct bt_iter *iter)