API : iterator get and set position
authorJulien Desfossez <julien.desfossez@efficios.com>
Fri, 10 Feb 2012 02:32:52 +0000 (21:32 -0500)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Fri, 10 Feb 2012 02:32:52 +0000 (21:32 -0500)
This patch implements the save and restore position on a iterator.

Passed successfully these tests :
- test 1:
save the position
read n events (tested with multiple values of n)
restore
compare the outputs

- test 2:
in loop until the end of the trace :
        save the position
        read 5 events
        restore the position
        read 5 events
compare each block of 5 events

- test 3:
save the position before printing the first event
parse the trace entirely
restore to the beginning
parse the trace entirely
compare the 2 outputs

- test 4:
seek in the future (manually tweaked the values to restore to)
compare with a normal trace starting from the restore point

[ edit: use g_new/g_new0/g_free instead of malloc/calloc/free.
  Use g_new0 anytime a structure is not explicitly initialized by
  a helper, so if it grows in the future with new fields, they will be
  zeroed. ]

Signed-off-by: Julien Desfossez <julien.desfossez@efficios.com>
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
formats/ctf-text/ctf-text.c
formats/ctf/ctf.c
include/babeltrace/ctf-ir/metadata.h
include/babeltrace/ctf/types.h
include/babeltrace/iterator.h
lib/iterator.c

index 81dbb3eb159ef0b423ea09696c086726ce5272f7..facfe57e9da203084795d19db507d3e4386e5c62 100644 (file)
@@ -387,6 +387,7 @@ int ctf_text_write_event(struct stream_pos *ppos,
        /* newline */
        fprintf(pos->fp, "\n");
        pos->field_nr = 0;
+       stream->consumed = 1;
 
        return 0;
 
index 80bd37869ed897f947c1b99e3f871b3d93d9692d..3ce428c387681db74cefadffd0f861daad43aba0 100644 (file)
@@ -229,6 +229,11 @@ int ctf_read_event(struct stream_pos *ppos, struct ctf_stream *stream)
 
        ctf_pos_get_event(pos);
 
+       /* save the current position as a restore point */
+       pos->last_offset = pos->offset;
+       /* we just read the event, it is consumed when used by the caller */
+       stream->consumed = 0;
+
        /*
         * This is the EOF check after we've advanced the position in
         * ctf_pos_get_event.
@@ -507,8 +512,8 @@ void ctf_move_pos_slow(struct ctf_stream_pos *pos, size_t offset, int whence)
                        break;
                }
                case SEEK_SET:
-                       assert(offset == 0);    /* only seek supported for now */
-                       pos->cur_index = 0;
+                       if (offset == 0)
+                               pos->cur_index = 0;
                        file_stream->parent.prev_timestamp = 0;
                        file_stream->parent.prev_timestamp_end = 0;
                        break;
index 29d0fd492ec34d72d481d391eeb077b7b52e1919..0e5842e4025e8658cab278fde68b2e93451ca0ad 100644 (file)
@@ -41,6 +41,7 @@ struct ctf_stream {
        uint64_t event_id;                      /* Current event ID */
        int has_timestamp;
        uint64_t stream_id;
+       int consumed;                           /* Last packet used by caller */
 
        struct definition_struct *trace_packet_header;
        struct definition_struct *stream_packet_context;
index c5c824fdb8bab26f3d6808ec9562a6f70a807fa9..12cca6e12539ddc0880b5e55677f476800882c2e 100644 (file)
@@ -60,6 +60,7 @@ struct ctf_stream_pos {
        uint32_t *content_size_loc; /* pointer to current content size */
        char *base;             /* mmap base address */
        ssize_t offset;         /* offset from base, in bits. EOF for end of file. */
+       ssize_t last_offset;    /* offset before the last read_event */
        size_t cur_index;       /* current index in packet index */
        void (*move_pos_slow)(struct ctf_stream_pos *pos, size_t offset,
                        int whence); /* function called to switch packet */
index 1de2809b88a8562715689cb33d903b5c91d0deb9..2760724291250be7a71f328af0e33c081bdd82d7 100644 (file)
@@ -69,13 +69,13 @@ void bt_iter_destroy(struct bt_iter *iter);
 int bt_iter_next(struct bt_iter *iter);
 
 /*
- * bt_iter_save_pos - Save the current trace collection position.
+ * bt_iter_get_pos - Get the current iterator position.
  *
  * The position returned by this function needs to be freed by
  * bt_iter_free_pos after use.
  */
 struct bt_iter_pos *
-       bt_iter_save_pos(struct bt_iter *iter);
+       bt_iter_get_pos(struct bt_iter *iter);
 
 /*
  * bt_iter_free_pos - Free the position.
@@ -83,13 +83,13 @@ struct bt_iter_pos *
 void bt_iter_free_pos(struct bt_iter_pos *pos);
 
 /*
- * bt_iter_seek: seek iterator to given position.
+ * bt_iter_set_pos: move the iterator to a given position.
  *
  * Return EOF if position is after the last event of the trace collection.
  * Return other negative value for other errors.
  * Return 0 for success.
  */
-int bt_iter_seek(struct bt_iter *iter,
+int bt_iter_set_pos(struct bt_iter *iter,
                const struct bt_iter_pos *pos);
 
 /*
index 4d201dc48942d911edc5647db48837869fcb526a..0ae2a1ba1b265451f50f1c9e0c44d2f314248767 100644 (file)
@@ -26,6 +26,7 @@
 #include <babeltrace/iterator-internal.h>
 #include <babeltrace/iterator.h>
 #include <babeltrace/prio_heap.h>
+#include <inttypes.h>
 
 struct stream_saved_pos {
        /*
@@ -35,6 +36,7 @@ struct stream_saved_pos {
        struct ctf_file_stream *file_stream;
        size_t cur_index;       /* current index in packet index */
        ssize_t offset;         /* offset from base, in bits. EOF for end of file. */
+       uint64_t current_timestamp;
 };
 
 struct bt_saved_pos {
@@ -69,6 +71,184 @@ int stream_compare(void *a, void *b)
                return 0;
 }
 
+void bt_iter_free_pos(struct bt_iter_pos *iter_pos)
+{
+       if (iter_pos) {
+               if (iter_pos->u.restore) {
+                       if (iter_pos->u.restore->stream_saved_pos) {
+                               g_array_free(
+                                       iter_pos->u.restore->stream_saved_pos,
+                                       TRUE);
+                       }
+                       g_free(iter_pos->u.restore);
+               }
+               g_free(iter_pos);
+       }
+}
+
+int bt_iter_set_pos(struct bt_iter *iter, const struct bt_iter_pos *iter_pos)
+{
+       int i, ret;
+
+       if (iter_pos->u.restore) {
+               /* clear and recreate the heap */
+               heap_free(iter->stream_heap);
+               g_free(iter->stream_heap);
+               iter->stream_heap = g_new(struct ptr_heap, 1);
+               ret = heap_init(iter->stream_heap, 0, stream_compare);
+               if (ret < 0)
+                       goto error_heap_init;
+
+               for (i = 0; i < iter_pos->u.restore->stream_saved_pos->len;
+                               i++) {
+                       struct stream_saved_pos *saved_pos;
+                       struct ctf_stream_pos *stream_pos;
+                       struct packet_index *index;
+                       struct ctf_stream *stream;
+
+                       saved_pos = &g_array_index(
+                                       iter_pos->u.restore->stream_saved_pos,
+                                       struct stream_saved_pos, i);
+                       stream = &saved_pos->file_stream->parent;
+                       stream_pos = &saved_pos->file_stream->pos;
+                       index = &g_array_index(stream_pos->packet_index,
+                                       struct packet_index,
+                                       saved_pos->cur_index);
+
+                       stream_pos->cur_index = saved_pos->cur_index;
+                       stream_pos->move_pos_slow(stream_pos, index->offset,
+                                       SEEK_SET);
+
+                       /*
+                        * the timestamp needs to be restored after
+                        * move_pos_slow, because this function resets
+                        * the timestamp to the beginning of the packet
+                        */
+                       stream->timestamp = saved_pos->current_timestamp;
+                       stream_pos->offset = saved_pos->offset;
+                       stream_pos->last_offset = saved_pos->offset;
+
+                       stream->prev_timestamp = 0;
+                       stream->prev_timestamp_end = 0;
+                       stream->consumed = 0;
+
+                       printf_debug("restored to cur_index = %zd and "
+                               "offset = %zd, timestamp = %" PRIu64 "\n",
+                               stream_pos->cur_index,
+                               stream_pos->offset, stream->timestamp);
+
+                       stream_read_event(saved_pos->file_stream);
+
+                       /* Add to heap */
+                       ret = heap_insert(iter->stream_heap,
+                                       saved_pos->file_stream);
+                       if (ret)
+                               goto error;
+               }
+       } else if (iter_pos->u.seek_time) {
+               /* not yet implemented */
+               return -1;
+       } else {
+               /* nowhere to seek to */
+               return -1;
+       }
+
+       return 0;
+error:
+       heap_free(iter->stream_heap);
+error_heap_init:
+       g_free(iter->stream_heap);
+       return -1;
+}
+
+struct bt_iter_pos *bt_iter_get_pos(struct bt_iter *iter)
+{
+       struct bt_iter_pos *pos;
+       struct trace_collection *tc = iter->ctx->tc;
+       int i, stream_class_id, stream_id;
+
+       pos = g_new0(struct bt_iter_pos, 1);
+       if (!pos) {
+               perror("allocating bt_iter_pos");
+               goto error;
+       }
+
+       pos->u.restore = g_new0(struct bt_saved_pos, 1);
+       if (!pos->u.restore) {
+               perror("allocating bt_saved_pos");
+               goto error;
+       }
+
+       pos->u.restore->tc = tc;
+       pos->u.restore->stream_saved_pos = g_array_new(FALSE, TRUE,
+                       sizeof(struct stream_saved_pos));
+       if (!pos->u.restore->stream_saved_pos)
+               goto error;
+
+       for (i = 0; i < tc->array->len; i++) {
+               struct ctf_trace *tin;
+               struct trace_descriptor *td_read;
+
+               td_read = g_ptr_array_index(tc->array, i);
+               tin = container_of(td_read, struct ctf_trace, parent);
+
+               for (stream_class_id = 0; stream_class_id < tin->streams->len;
+                               stream_class_id++) {
+                       struct ctf_stream_class *stream_class;
+
+                       stream_class = g_ptr_array_index(tin->streams,
+                                       stream_class_id);
+                       for (stream_id = 0;
+                                       stream_id < stream_class->streams->len;
+                                       stream_id++) {
+                               struct ctf_stream *stream;
+                               struct ctf_file_stream *cfs;
+                               struct stream_saved_pos saved_pos;
+
+                               stream = g_ptr_array_index(
+                                               stream_class->streams,
+                                               stream_id);
+                               cfs = container_of(stream,
+                                               struct ctf_file_stream,
+                                               parent);
+
+                               saved_pos.file_stream = cfs;
+                               saved_pos.cur_index = cfs->pos.cur_index;
+
+                               /*
+                                * It is possible that an event was read during
+                                * the last restore, never consumed and its
+                                * position saved again.  For this case, we
+                                * need to check if the event really was
+                                * consumed by the caller otherwise it is lost.
+                                */
+                               if (stream->consumed)
+                                       saved_pos.offset = cfs->pos.offset;
+                               else
+                                       saved_pos.offset = cfs->pos.last_offset;
+
+                               saved_pos.current_timestamp = stream->timestamp;
+
+                               g_array_append_val(
+                                       pos->u.restore->stream_saved_pos,
+                                       saved_pos);
+
+                               printf_debug("stream : %lu, cur_index : %zd, "
+                                       "offset : %zd, "
+                                       "timestamp = %" PRIu64 "\n",
+                                       stream->stream_id, saved_pos.cur_index,
+                                       saved_pos.offset,
+                                       saved_pos.current_timestamp);
+                       }
+               }
+       }
+
+       return pos;
+
+error:
+       return NULL;
+}
+
 /*
  * babeltrace_filestream_seek: seek a filestream to given position.
  *
@@ -150,7 +330,7 @@ struct bt_iter *bt_iter_create(struct bt_context *ctx,
        int ret = 0;
        struct bt_iter *iter;
 
-       iter = malloc(sizeof(struct bt_iter));
+       iter = g_new0(struct bt_iter, 1);
        if (!iter)
                goto error_malloc;
        iter->stream_heap = g_new(struct ptr_heap, 1);
@@ -215,7 +395,7 @@ error:
 error_heap_init:
        g_free(iter->stream_heap);
 error_ctx:
-       free(iter);
+       g_free(iter);
 error_malloc:
        return NULL;
 }
@@ -251,7 +431,7 @@ void bt_iter_destroy(struct bt_iter *iter)
 
        bt_context_put(iter->ctx);
 
-       free(iter);
+       g_free(iter);
 }
 
 int bt_iter_next(struct bt_iter *iter)
This page took 0.030105 seconds and 4 git commands to generate.