X-Git-Url: https://git.efficios.com/?a=blobdiff_plain;f=lib%2Fiterator.c;h=639a2d290a5abc5aeabc94148c772c6b3dc9dc67;hb=HEAD;hp=a06d66bdd34790feaea7a2ca477b59b0b89fd93a;hpb=4b8de1531a2a5b5ef8ca87ad104567de4f74f469;p=babeltrace.git diff --git a/lib/iterator.c b/lib/iterator.c deleted file mode 100644 index a06d66bd..00000000 --- a/lib/iterator.c +++ /dev/null @@ -1,792 +0,0 @@ -/* - * iterator.c - * - * Babeltrace Library - * - * Copyright 2010-2011 EfficiOS Inc. and Linux Foundation - * - * Author: Mathieu Desnoyers - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - */ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -static int babeltrace_filestream_seek(struct ctf_file_stream *file_stream, - const struct bt_iter_pos *begin_pos, - unsigned long stream_id); - -struct stream_saved_pos { - /* - * Use file_stream pointer to check if the trace collection we - * restore to match the one we saved from, for each stream. - */ - struct ctf_file_stream *file_stream; - size_t cur_index; /* current index in packet index */ - ssize_t offset; /* offset from base, in bits. EOF for end of file. */ - uint64_t current_real_timestamp; - uint64_t current_cycles_timestamp; -}; - -struct bt_saved_pos { - struct trace_collection *tc; - GArray *stream_saved_pos; /* Contains struct stream_saved_pos */ -}; - -static int stream_read_event(struct ctf_file_stream *sin) -{ - int ret; - - ret = sin->pos.parent.event_cb(&sin->pos.parent, &sin->parent); - if (ret == EOF) - return EOF; - else if (ret) { - fprintf(stderr, "[error] Reading event failed.\n"); - return ret; - } - return 0; -} - -/* - * returns true if a < b, false otherwise. - */ -static int stream_compare(void *a, void *b) -{ - struct ctf_file_stream *s_a = a, *s_b = b; - - if (s_a->parent.real_timestamp < s_b->parent.real_timestamp) - return 1; - else - return 0; -} - -void bt_iter_free_pos(struct bt_iter_pos *iter_pos) -{ - if (!iter_pos) - return; - - if (iter_pos->type == BT_SEEK_RESTORE && iter_pos->u.restore) { - if (iter_pos->u.restore->stream_saved_pos) { - g_array_free( - iter_pos->u.restore->stream_saved_pos, - TRUE); - } - g_free(iter_pos->u.restore); - } - g_free(iter_pos); -} - -/* - * seek_file_stream_by_timestamp - * - * Browse a filestream by index, if an index contains the timestamp passed in - * argument, seek inside the corresponding packet it until we find the event we - * are looking for (either the exact timestamp or the event just after the - * timestamp). - * - * Return 0 if the seek succeded, EOF if we didn't find any packet - * containing the timestamp, or a positive integer for error. - * - * TODO: this should be turned into a binary search! It is currently - * doing a linear search in the packets. This is a O(n) operation on a - * very frequent code path. - */ -static int seek_file_stream_by_timestamp(struct ctf_file_stream *cfs, - uint64_t timestamp) -{ - struct ctf_stream_pos *stream_pos; - struct packet_index *index; - int i, ret; - - stream_pos = &cfs->pos; - for (i = 0; i < stream_pos->packet_real_index->len; i++) { - index = &g_array_index(stream_pos->packet_real_index, - struct packet_index, i); - if (index->timestamp_end < timestamp) - continue; - - stream_pos->packet_seek(&stream_pos->parent, i, SEEK_SET); - do { - ret = stream_read_event(cfs); - } while (cfs->parent.real_timestamp < timestamp && ret == 0); - - /* Can return either EOF, 0, or error (> 0). */ - return ret; - } - /* - * Cannot find the timestamp within the stream packets, return - * EOF. - */ - return EOF; -} - -/* - * seek_ctf_trace_by_timestamp : for each file stream, seek to the event with - * the corresponding timestamp - * - * Return 0 on success. - * If the timestamp is not part of any file stream, return EOF to inform the - * user the timestamp is out of the scope. - * On other errors, return positive value. - */ -static int seek_ctf_trace_by_timestamp(struct ctf_trace *tin, - uint64_t timestamp, struct ptr_heap *stream_heap) -{ - int i, j, ret; - int found = 0; - - /* for each stream_class */ - for (i = 0; i < tin->streams->len; i++) { - struct ctf_stream_declaration *stream_class; - - stream_class = g_ptr_array_index(tin->streams, i); - if (!stream_class) - continue; - /* for each file_stream */ - for (j = 0; j < stream_class->streams->len; j++) { - struct ctf_stream_definition *stream; - struct ctf_file_stream *cfs; - - stream = g_ptr_array_index(stream_class->streams, j); - if (!stream) - continue; - cfs = container_of(stream, struct ctf_file_stream, - parent); - ret = seek_file_stream_by_timestamp(cfs, timestamp); - if (ret == 0) { - /* Add to heap */ - ret = heap_insert(stream_heap, cfs); - if (ret) { - /* Return positive error. */ - return -ret; - } - found = 1; - } else if (ret > 0) { - /* - * Error in seek (not EOF), failure. - */ - return ret; - } - /* on EOF just do not put stream into heap. */ - } - } - - return found ? 0 : EOF; -} - -/* - * Find timestamp of last event in the stream. - * - * Return value: 0 if OK, positive error value on error, EOF if no - * events were found. - */ -static int find_max_timestamp_ctf_file_stream(struct ctf_file_stream *cfs, - uint64_t *timestamp_end) -{ - int ret, count = 0, i; - uint64_t timestamp = 0; - struct ctf_stream_pos *stream_pos; - - stream_pos = &cfs->pos; - /* - * We start by the last packet, and iterate backwards until we - * either find at least one event, or we reach the first packet - * (some packets can be empty). - */ - for (i = stream_pos->packet_real_index->len - 1; i >= 0; i--) { - stream_pos->packet_seek(&stream_pos->parent, i, SEEK_SET); - count = 0; - /* read each event until we reach the end of the stream */ - do { - ret = stream_read_event(cfs); - if (ret == 0) { - count++; - timestamp = cfs->parent.real_timestamp; - } - } while (ret == 0); - - /* Error */ - if (ret > 0) - goto end; - assert(ret == EOF); - if (count) - break; - } - - if (count) { - *timestamp_end = timestamp; - ret = 0; - } else { - /* Return EOF if no events were found */ - ret = EOF; - } -end: - return ret; -} - -/* - * Find the stream within a stream class that contains the event with - * the largest timestamp, and save that timestamp. - * - * Return 0 if OK, EOF if no events were found in the streams, or - * positive value on error. - */ -static int find_max_timestamp_ctf_stream_class( - struct ctf_stream_declaration *stream_class, - struct ctf_file_stream **cfsp, - uint64_t *max_timestamp) -{ - int ret = EOF, i; - - for (i = 0; i < stream_class->streams->len; i++) { - struct ctf_stream_definition *stream; - struct ctf_file_stream *cfs; - uint64_t current_max_ts = 0; - - stream = g_ptr_array_index(stream_class->streams, i); - if (!stream) - continue; - cfs = container_of(stream, struct ctf_file_stream, parent); - ret = find_max_timestamp_ctf_file_stream(cfs, ¤t_max_ts); - if (ret == EOF) - continue; - if (ret != 0) - break; - if (current_max_ts >= *max_timestamp) { - *max_timestamp = current_max_ts; - *cfsp = cfs; - } - } - assert(ret >= 0 || ret == EOF); - return ret; -} - -/* - * seek_last_ctf_trace_collection: seek trace collection to last event. - * - * Return 0 if OK, EOF if no events were found, or positive error value - * on error. - */ -static int seek_last_ctf_trace_collection(struct trace_collection *tc, - struct ctf_file_stream **cfsp) -{ - int i, j, ret; - int found = 0; - uint64_t max_timestamp = 0; - - if (!tc) - return 1; - - /* For each trace in the trace_collection */ - for (i = 0; i < tc->array->len; i++) { - struct ctf_trace *tin; - struct trace_descriptor *td_read; - - td_read = g_ptr_array_index(tc->array, i); - if (!td_read) - continue; - tin = container_of(td_read, struct ctf_trace, parent); - /* For each stream_class in the trace */ - for (j = 0; j < tin->streams->len; j++) { - struct ctf_stream_declaration *stream_class; - - stream_class = g_ptr_array_index(tin->streams, j); - if (!stream_class) - continue; - ret = find_max_timestamp_ctf_stream_class(stream_class, - cfsp, &max_timestamp); - if (ret > 0) - goto end; - if (ret == 0) - found = 1; - assert(ret == EOF || ret == 0); - } - } - /* - * Now we know in which file stream the last event is located, - * and we know its timestamp. - */ - if (!found) { - ret = EOF; - } else { - ret = seek_file_stream_by_timestamp(*cfsp, max_timestamp); - assert(ret == 0); - } -end: - return ret; -} - -int bt_iter_set_pos(struct bt_iter *iter, const struct bt_iter_pos *iter_pos) -{ - struct trace_collection *tc; - int i, ret; - - if (!iter || !iter_pos) - return -EINVAL; - - switch (iter_pos->type) { - case BT_SEEK_RESTORE: - if (!iter_pos->u.restore) - return -EINVAL; - - heap_free(iter->stream_heap); - ret = heap_init(iter->stream_heap, 0, stream_compare); - if (ret < 0) - goto error_heap_init; - - for (i = 0; i < iter_pos->u.restore->stream_saved_pos->len; - i++) { - struct stream_saved_pos *saved_pos; - struct ctf_stream_pos *stream_pos; - struct ctf_stream_definition *stream; - - saved_pos = &g_array_index( - iter_pos->u.restore->stream_saved_pos, - struct stream_saved_pos, i); - stream = &saved_pos->file_stream->parent; - stream_pos = &saved_pos->file_stream->pos; - - stream_pos->packet_seek(&stream_pos->parent, - saved_pos->cur_index, SEEK_SET); - - /* - * the timestamp needs to be restored after - * packet_seek, because this function resets - * the timestamp to the beginning of the packet - */ - stream->real_timestamp = saved_pos->current_real_timestamp; - stream->cycles_timestamp = saved_pos->current_cycles_timestamp; - stream_pos->offset = saved_pos->offset; - stream_pos->last_offset = LAST_OFFSET_POISON; - - stream->prev_real_timestamp = 0; - stream->prev_real_timestamp_end = 0; - stream->prev_cycles_timestamp = 0; - stream->prev_cycles_timestamp_end = 0; - - printf_debug("restored to cur_index = %zd and " - "offset = %zd, timestamp = %" PRIu64 "\n", - stream_pos->cur_index, - stream_pos->offset, stream->real_timestamp); - - ret = stream_read_event(saved_pos->file_stream); - if (ret != 0) { - goto error; - } - - /* Add to heap */ - ret = heap_insert(iter->stream_heap, - saved_pos->file_stream); - if (ret) - goto error; - } - return 0; - case BT_SEEK_TIME: - tc = iter->ctx->tc; - - heap_free(iter->stream_heap); - ret = heap_init(iter->stream_heap, 0, stream_compare); - if (ret < 0) - goto error_heap_init; - - /* for each trace in the trace_collection */ - for (i = 0; i < tc->array->len; i++) { - struct ctf_trace *tin; - struct trace_descriptor *td_read; - - td_read = g_ptr_array_index(tc->array, i); - if (!td_read) - continue; - tin = container_of(td_read, struct ctf_trace, parent); - - ret = seek_ctf_trace_by_timestamp(tin, - iter_pos->u.seek_time, - iter->stream_heap); - /* - * Positive errors are failure. Negative value - * is EOF (for which we continue with other - * traces). 0 is success. Note: on EOF, it just - * means that no stream has been added to the - * iterator for that trace, which is fine. - */ - if (ret != 0 && ret != EOF) - goto error; - } - return 0; - case BT_SEEK_BEGIN: - tc = iter->ctx->tc; - heap_free(iter->stream_heap); - ret = heap_init(iter->stream_heap, 0, stream_compare); - if (ret < 0) - goto error_heap_init; - - for (i = 0; i < tc->array->len; i++) { - struct ctf_trace *tin; - struct trace_descriptor *td_read; - int stream_id; - - td_read = g_ptr_array_index(tc->array, i); - if (!td_read) - continue; - tin = container_of(td_read, struct ctf_trace, parent); - - /* Populate heap with each stream */ - for (stream_id = 0; stream_id < tin->streams->len; - stream_id++) { - struct ctf_stream_declaration *stream; - int filenr; - - stream = g_ptr_array_index(tin->streams, - stream_id); - if (!stream) - continue; - for (filenr = 0; filenr < stream->streams->len; - filenr++) { - struct ctf_file_stream *file_stream; - file_stream = g_ptr_array_index( - stream->streams, - filenr); - if (!file_stream) - continue; - ret = babeltrace_filestream_seek( - file_stream, iter_pos, - stream_id); - if (ret != 0 && ret != EOF) { - goto error; - } - ret = heap_insert(iter->stream_heap, file_stream); - if (ret) - goto error; - } - } - } - break; - case BT_SEEK_LAST: - { - struct ctf_file_stream *cfs = NULL; - - tc = iter->ctx->tc; - ret = seek_last_ctf_trace_collection(tc, &cfs); - if (ret != 0 || !cfs) - goto error; - /* remove all streams from the heap */ - heap_free(iter->stream_heap); - /* Create a new empty heap */ - ret = heap_init(iter->stream_heap, 0, stream_compare); - if (ret < 0) - goto error; - /* Insert the stream that contains the last event */ - ret = heap_insert(iter->stream_heap, cfs); - if (ret) - goto error; - break; - } - default: - /* not implemented */ - return -EINVAL; - } - - return 0; - -error: - heap_free(iter->stream_heap); -error_heap_init: - if (heap_init(iter->stream_heap, 0, stream_compare) < 0) { - heap_free(iter->stream_heap); - g_free(iter->stream_heap); - iter->stream_heap = NULL; - ret = -ENOMEM; - } - - return ret; -} - -struct bt_iter_pos *bt_iter_get_pos(struct bt_iter *iter) -{ - struct bt_iter_pos *pos; - struct trace_collection *tc; - struct ctf_file_stream *file_stream = NULL, *removed; - struct ptr_heap iter_heap_copy; - int ret; - - if (!iter) - return NULL; - - tc = iter->ctx->tc; - pos = g_new0(struct bt_iter_pos, 1); - pos->type = BT_SEEK_RESTORE; - pos->u.restore = g_new0(struct bt_saved_pos, 1); - pos->u.restore->tc = tc; - pos->u.restore->stream_saved_pos = g_array_new(FALSE, TRUE, - sizeof(struct stream_saved_pos)); - if (!pos->u.restore->stream_saved_pos) - goto error; - - ret = heap_copy(&iter_heap_copy, iter->stream_heap); - if (ret < 0) - goto error_heap; - - /* iterate over each stream in the heap */ - file_stream = heap_maximum(&iter_heap_copy); - while (file_stream != NULL) { - struct stream_saved_pos saved_pos; - - assert(file_stream->pos.last_offset != LAST_OFFSET_POISON); - saved_pos.offset = file_stream->pos.last_offset; - saved_pos.file_stream = file_stream; - saved_pos.cur_index = file_stream->pos.cur_index; - - saved_pos.current_real_timestamp = file_stream->parent.real_timestamp; - saved_pos.current_cycles_timestamp = file_stream->parent.cycles_timestamp; - - g_array_append_val( - pos->u.restore->stream_saved_pos, - saved_pos); - - printf_debug("stream : %" PRIu64 ", cur_index : %zd, " - "offset : %zd, " - "timestamp = %" PRIu64 "\n", - file_stream->parent.stream_id, - saved_pos.cur_index, saved_pos.offset, - saved_pos.current_real_timestamp); - - /* remove the stream from the heap copy */ - removed = heap_remove(&iter_heap_copy); - assert(removed == file_stream); - - file_stream = heap_maximum(&iter_heap_copy); - } - heap_free(&iter_heap_copy); - return pos; - -error_heap: - g_array_free(pos->u.restore->stream_saved_pos, TRUE); -error: - g_free(pos); - return NULL; -} - -struct bt_iter_pos *bt_iter_create_time_pos(struct bt_iter *iter, - uint64_t timestamp) -{ - struct bt_iter_pos *pos; - - if (!iter) - return NULL; - - pos = g_new0(struct bt_iter_pos, 1); - pos->type = BT_SEEK_TIME; - pos->u.seek_time = timestamp; - return pos; -} - -/* - * babeltrace_filestream_seek: seek a filestream to given position. - * - * The stream_id parameter is only useful for BT_SEEK_RESTORE. - */ -static int babeltrace_filestream_seek(struct ctf_file_stream *file_stream, - const struct bt_iter_pos *begin_pos, - unsigned long stream_id) -{ - int ret = 0; - - if (!file_stream || !begin_pos) - return -EINVAL; - - switch (begin_pos->type) { - case BT_SEEK_CUR: - /* - * just insert into the heap we should already know - * the timestamps - */ - break; - case BT_SEEK_BEGIN: - file_stream->pos.packet_seek(&file_stream->pos.parent, - 0, SEEK_SET); - ret = stream_read_event(file_stream); - break; - case BT_SEEK_TIME: - case BT_SEEK_RESTORE: - case BT_SEEK_END: - default: - assert(0); /* Not yet defined */ - } - - return ret; -} - -int bt_iter_init(struct bt_iter *iter, - struct bt_context *ctx, - const struct bt_iter_pos *begin_pos, - const struct bt_iter_pos *end_pos) -{ - int i, stream_id; - int ret = 0; - - if (!iter || !ctx) - return -EINVAL; - - if (ctx->current_iterator) { - ret = -1; - goto error_ctx; - } - - iter->stream_heap = g_new(struct ptr_heap, 1); - iter->end_pos = end_pos; - bt_context_get(ctx); - iter->ctx = ctx; - - ret = heap_init(iter->stream_heap, 0, stream_compare); - if (ret < 0) - goto error_heap_init; - - for (i = 0; i < ctx->tc->array->len; i++) { - struct ctf_trace *tin; - struct trace_descriptor *td_read; - - td_read = g_ptr_array_index(ctx->tc->array, i); - if (!td_read) - continue; - tin = container_of(td_read, struct ctf_trace, parent); - - /* Populate heap with each stream */ - for (stream_id = 0; stream_id < tin->streams->len; - stream_id++) { - struct ctf_stream_declaration *stream; - int filenr; - - stream = g_ptr_array_index(tin->streams, stream_id); - if (!stream) - continue; - for (filenr = 0; filenr < stream->streams->len; - filenr++) { - struct ctf_file_stream *file_stream; - - file_stream = g_ptr_array_index(stream->streams, - filenr); - if (!file_stream) - continue; - if (begin_pos) { - ret = babeltrace_filestream_seek( - file_stream, - begin_pos, - stream_id); - } else { - struct bt_iter_pos pos; - pos.type = BT_SEEK_BEGIN; - ret = babeltrace_filestream_seek( - file_stream, &pos, - stream_id); - } - if (ret == EOF) { - ret = 0; - continue; - } else if (ret) { - goto error; - } - /* Add to heap */ - ret = heap_insert(iter->stream_heap, file_stream); - if (ret) - goto error; - } - } - } - - ctx->current_iterator = iter; - return 0; - -error: - heap_free(iter->stream_heap); -error_heap_init: - g_free(iter->stream_heap); - iter->stream_heap = NULL; -error_ctx: - return ret; -} - -struct bt_iter *bt_iter_create(struct bt_context *ctx, - const struct bt_iter_pos *begin_pos, - const struct bt_iter_pos *end_pos) -{ - struct bt_iter *iter; - int ret; - - if (!ctx) - return NULL; - - iter = g_new0(struct bt_iter, 1); - ret = bt_iter_init(iter, ctx, begin_pos, end_pos); - if (ret) { - g_free(iter); - return NULL; - } - return iter; -} - -void bt_iter_fini(struct bt_iter *iter) -{ - assert(iter); - if (iter->stream_heap) { - heap_free(iter->stream_heap); - g_free(iter->stream_heap); - } - iter->ctx->current_iterator = NULL; - bt_context_put(iter->ctx); -} - -void bt_iter_destroy(struct bt_iter *iter) -{ - assert(iter); - bt_iter_fini(iter); - g_free(iter); -} - -int bt_iter_next(struct bt_iter *iter) -{ - struct ctf_file_stream *file_stream, *removed; - int ret; - - if (!iter) - return -EINVAL; - - file_stream = heap_maximum(iter->stream_heap); - if (!file_stream) { - /* end of file for all streams */ - ret = 0; - goto end; - } - - ret = stream_read_event(file_stream); - if (ret == EOF) { - removed = heap_remove(iter->stream_heap); - assert(removed == file_stream); - ret = 0; - goto end; - } else if (ret) { - goto end; - } - /* Reinsert the file stream into the heap, and rebalance. */ - removed = heap_replace_max(iter->stream_heap, file_stream); - assert(removed == file_stream); - -end: - return ret; -}