From: Julien Desfossez Date: Fri, 10 Feb 2012 02:32:52 +0000 (-0500) Subject: API : iterator get and set position X-Git-Tag: v0.11~12 X-Git-Url: http://git.efficios.com/?p=babeltrace.git;a=commitdiff_plain;h=90fcbacc6eb672f0d5e0c9d6722d397ac9f1a63e API : iterator get and set position This patch implements the save and restore position on a iterator. Passed successfully these tests : - test 1: save the position read n events (tested with multiple values of n) restore compare the outputs - test 2: in loop until the end of the trace : save the position read 5 events restore the position read 5 events compare each block of 5 events - test 3: save the position before printing the first event parse the trace entirely restore to the beginning parse the trace entirely compare the 2 outputs - test 4: seek in the future (manually tweaked the values to restore to) compare with a normal trace starting from the restore point [ edit: use g_new/g_new0/g_free instead of malloc/calloc/free. Use g_new0 anytime a structure is not explicitly initialized by a helper, so if it grows in the future with new fields, they will be zeroed. ] Signed-off-by: Julien Desfossez Signed-off-by: Mathieu Desnoyers --- diff --git a/formats/ctf-text/ctf-text.c b/formats/ctf-text/ctf-text.c index 81dbb3eb..facfe57e 100644 --- a/formats/ctf-text/ctf-text.c +++ b/formats/ctf-text/ctf-text.c @@ -387,6 +387,7 @@ int ctf_text_write_event(struct stream_pos *ppos, /* newline */ fprintf(pos->fp, "\n"); pos->field_nr = 0; + stream->consumed = 1; return 0; diff --git a/formats/ctf/ctf.c b/formats/ctf/ctf.c index 80bd3786..3ce428c3 100644 --- a/formats/ctf/ctf.c +++ b/formats/ctf/ctf.c @@ -229,6 +229,11 @@ int ctf_read_event(struct stream_pos *ppos, struct ctf_stream *stream) ctf_pos_get_event(pos); + /* save the current position as a restore point */ + pos->last_offset = pos->offset; + /* we just read the event, it is consumed when used by the caller */ + stream->consumed = 0; + /* * This is the EOF check after we've advanced the position in * ctf_pos_get_event. @@ -507,8 +512,8 @@ void ctf_move_pos_slow(struct ctf_stream_pos *pos, size_t offset, int whence) break; } case SEEK_SET: - assert(offset == 0); /* only seek supported for now */ - pos->cur_index = 0; + if (offset == 0) + pos->cur_index = 0; file_stream->parent.prev_timestamp = 0; file_stream->parent.prev_timestamp_end = 0; break; diff --git a/include/babeltrace/ctf-ir/metadata.h b/include/babeltrace/ctf-ir/metadata.h index 29d0fd49..0e5842e4 100644 --- a/include/babeltrace/ctf-ir/metadata.h +++ b/include/babeltrace/ctf-ir/metadata.h @@ -41,6 +41,7 @@ struct ctf_stream { uint64_t event_id; /* Current event ID */ int has_timestamp; uint64_t stream_id; + int consumed; /* Last packet used by caller */ struct definition_struct *trace_packet_header; struct definition_struct *stream_packet_context; diff --git a/include/babeltrace/ctf/types.h b/include/babeltrace/ctf/types.h index c5c824fd..12cca6e1 100644 --- a/include/babeltrace/ctf/types.h +++ b/include/babeltrace/ctf/types.h @@ -60,6 +60,7 @@ struct ctf_stream_pos { uint32_t *content_size_loc; /* pointer to current content size */ char *base; /* mmap base address */ ssize_t offset; /* offset from base, in bits. EOF for end of file. */ + ssize_t last_offset; /* offset before the last read_event */ size_t cur_index; /* current index in packet index */ void (*move_pos_slow)(struct ctf_stream_pos *pos, size_t offset, int whence); /* function called to switch packet */ diff --git a/include/babeltrace/iterator.h b/include/babeltrace/iterator.h index 1de2809b..27607242 100644 --- a/include/babeltrace/iterator.h +++ b/include/babeltrace/iterator.h @@ -69,13 +69,13 @@ void bt_iter_destroy(struct bt_iter *iter); int bt_iter_next(struct bt_iter *iter); /* - * bt_iter_save_pos - Save the current trace collection position. + * bt_iter_get_pos - Get the current iterator position. * * The position returned by this function needs to be freed by * bt_iter_free_pos after use. */ struct bt_iter_pos * - bt_iter_save_pos(struct bt_iter *iter); + bt_iter_get_pos(struct bt_iter *iter); /* * bt_iter_free_pos - Free the position. @@ -83,13 +83,13 @@ struct bt_iter_pos * void bt_iter_free_pos(struct bt_iter_pos *pos); /* - * bt_iter_seek: seek iterator to given position. + * bt_iter_set_pos: move the iterator to a given position. * * Return EOF if position is after the last event of the trace collection. * Return other negative value for other errors. * Return 0 for success. */ -int bt_iter_seek(struct bt_iter *iter, +int bt_iter_set_pos(struct bt_iter *iter, const struct bt_iter_pos *pos); /* diff --git a/lib/iterator.c b/lib/iterator.c index 4d201dc4..0ae2a1ba 100644 --- a/lib/iterator.c +++ b/lib/iterator.c @@ -26,6 +26,7 @@ #include #include #include +#include struct stream_saved_pos { /* @@ -35,6 +36,7 @@ struct stream_saved_pos { struct ctf_file_stream *file_stream; size_t cur_index; /* current index in packet index */ ssize_t offset; /* offset from base, in bits. EOF for end of file. */ + uint64_t current_timestamp; }; struct bt_saved_pos { @@ -69,6 +71,184 @@ int stream_compare(void *a, void *b) return 0; } +void bt_iter_free_pos(struct bt_iter_pos *iter_pos) +{ + if (iter_pos) { + if (iter_pos->u.restore) { + if (iter_pos->u.restore->stream_saved_pos) { + g_array_free( + iter_pos->u.restore->stream_saved_pos, + TRUE); + } + g_free(iter_pos->u.restore); + } + g_free(iter_pos); + } +} + +int bt_iter_set_pos(struct bt_iter *iter, const struct bt_iter_pos *iter_pos) +{ + int i, ret; + + if (iter_pos->u.restore) { + /* clear and recreate the heap */ + heap_free(iter->stream_heap); + g_free(iter->stream_heap); + iter->stream_heap = g_new(struct ptr_heap, 1); + ret = heap_init(iter->stream_heap, 0, stream_compare); + if (ret < 0) + goto error_heap_init; + + for (i = 0; i < iter_pos->u.restore->stream_saved_pos->len; + i++) { + struct stream_saved_pos *saved_pos; + struct ctf_stream_pos *stream_pos; + struct packet_index *index; + struct ctf_stream *stream; + + saved_pos = &g_array_index( + iter_pos->u.restore->stream_saved_pos, + struct stream_saved_pos, i); + stream = &saved_pos->file_stream->parent; + stream_pos = &saved_pos->file_stream->pos; + index = &g_array_index(stream_pos->packet_index, + struct packet_index, + saved_pos->cur_index); + + stream_pos->cur_index = saved_pos->cur_index; + stream_pos->move_pos_slow(stream_pos, index->offset, + SEEK_SET); + + /* + * the timestamp needs to be restored after + * move_pos_slow, because this function resets + * the timestamp to the beginning of the packet + */ + stream->timestamp = saved_pos->current_timestamp; + stream_pos->offset = saved_pos->offset; + stream_pos->last_offset = saved_pos->offset; + + stream->prev_timestamp = 0; + stream->prev_timestamp_end = 0; + stream->consumed = 0; + + printf_debug("restored to cur_index = %zd and " + "offset = %zd, timestamp = %" PRIu64 "\n", + stream_pos->cur_index, + stream_pos->offset, stream->timestamp); + + stream_read_event(saved_pos->file_stream); + + /* Add to heap */ + ret = heap_insert(iter->stream_heap, + saved_pos->file_stream); + if (ret) + goto error; + } + } else if (iter_pos->u.seek_time) { + /* not yet implemented */ + return -1; + } else { + /* nowhere to seek to */ + return -1; + } + + return 0; +error: + heap_free(iter->stream_heap); +error_heap_init: + g_free(iter->stream_heap); + return -1; +} + +struct bt_iter_pos *bt_iter_get_pos(struct bt_iter *iter) +{ + struct bt_iter_pos *pos; + struct trace_collection *tc = iter->ctx->tc; + int i, stream_class_id, stream_id; + + pos = g_new0(struct bt_iter_pos, 1); + if (!pos) { + perror("allocating bt_iter_pos"); + goto error; + } + + pos->u.restore = g_new0(struct bt_saved_pos, 1); + if (!pos->u.restore) { + perror("allocating bt_saved_pos"); + goto error; + } + + pos->u.restore->tc = tc; + pos->u.restore->stream_saved_pos = g_array_new(FALSE, TRUE, + sizeof(struct stream_saved_pos)); + if (!pos->u.restore->stream_saved_pos) + goto error; + + for (i = 0; i < tc->array->len; i++) { + struct ctf_trace *tin; + struct trace_descriptor *td_read; + + td_read = g_ptr_array_index(tc->array, i); + tin = container_of(td_read, struct ctf_trace, parent); + + for (stream_class_id = 0; stream_class_id < tin->streams->len; + stream_class_id++) { + struct ctf_stream_class *stream_class; + + stream_class = g_ptr_array_index(tin->streams, + stream_class_id); + for (stream_id = 0; + stream_id < stream_class->streams->len; + stream_id++) { + struct ctf_stream *stream; + struct ctf_file_stream *cfs; + struct stream_saved_pos saved_pos; + + stream = g_ptr_array_index( + stream_class->streams, + stream_id); + cfs = container_of(stream, + struct ctf_file_stream, + parent); + + saved_pos.file_stream = cfs; + saved_pos.cur_index = cfs->pos.cur_index; + + /* + * It is possible that an event was read during + * the last restore, never consumed and its + * position saved again. For this case, we + * need to check if the event really was + * consumed by the caller otherwise it is lost. + */ + if (stream->consumed) + saved_pos.offset = cfs->pos.offset; + else + saved_pos.offset = cfs->pos.last_offset; + + saved_pos.current_timestamp = stream->timestamp; + + g_array_append_val( + pos->u.restore->stream_saved_pos, + saved_pos); + + printf_debug("stream : %lu, cur_index : %zd, " + "offset : %zd, " + "timestamp = %" PRIu64 "\n", + stream->stream_id, saved_pos.cur_index, + saved_pos.offset, + saved_pos.current_timestamp); + } + } + } + + return pos; + +error: + return NULL; +} + /* * babeltrace_filestream_seek: seek a filestream to given position. * @@ -150,7 +330,7 @@ struct bt_iter *bt_iter_create(struct bt_context *ctx, int ret = 0; struct bt_iter *iter; - iter = malloc(sizeof(struct bt_iter)); + iter = g_new0(struct bt_iter, 1); if (!iter) goto error_malloc; iter->stream_heap = g_new(struct ptr_heap, 1); @@ -215,7 +395,7 @@ error: error_heap_init: g_free(iter->stream_heap); error_ctx: - free(iter); + g_free(iter); error_malloc: return NULL; } @@ -251,7 +431,7 @@ void bt_iter_destroy(struct bt_iter *iter) bt_context_put(iter->ctx); - free(iter); + g_free(iter); } int bt_iter_next(struct bt_iter *iter)