API : iterator get and set position
[babeltrace.git] / lib / iterator.c
index 4d201dc48942d911edc5647db48837869fcb526a..0ae2a1ba1b265451f50f1c9e0c44d2f314248767 100644 (file)
@@ -26,6 +26,7 @@
 #include <babeltrace/iterator-internal.h>
 #include <babeltrace/iterator.h>
 #include <babeltrace/prio_heap.h>
+#include <inttypes.h>
 
 struct stream_saved_pos {
        /*
@@ -35,6 +36,7 @@ struct stream_saved_pos {
        struct ctf_file_stream *file_stream;
        size_t cur_index;       /* current index in packet index */
        ssize_t offset;         /* offset from base, in bits. EOF for end of file. */
+       uint64_t current_timestamp;
 };
 
 struct bt_saved_pos {
@@ -69,6 +71,184 @@ int stream_compare(void *a, void *b)
                return 0;
 }
 
+void bt_iter_free_pos(struct bt_iter_pos *iter_pos)
+{
+       if (iter_pos) {
+               if (iter_pos->u.restore) {
+                       if (iter_pos->u.restore->stream_saved_pos) {
+                               g_array_free(
+                                       iter_pos->u.restore->stream_saved_pos,
+                                       TRUE);
+                       }
+                       g_free(iter_pos->u.restore);
+               }
+               g_free(iter_pos);
+       }
+}
+
+int bt_iter_set_pos(struct bt_iter *iter, const struct bt_iter_pos *iter_pos)
+{
+       int i, ret;
+
+       if (iter_pos->u.restore) {
+               /* clear and recreate the heap */
+               heap_free(iter->stream_heap);
+               g_free(iter->stream_heap);
+               iter->stream_heap = g_new(struct ptr_heap, 1);
+               ret = heap_init(iter->stream_heap, 0, stream_compare);
+               if (ret < 0)
+                       goto error_heap_init;
+
+               for (i = 0; i < iter_pos->u.restore->stream_saved_pos->len;
+                               i++) {
+                       struct stream_saved_pos *saved_pos;
+                       struct ctf_stream_pos *stream_pos;
+                       struct packet_index *index;
+                       struct ctf_stream *stream;
+
+                       saved_pos = &g_array_index(
+                                       iter_pos->u.restore->stream_saved_pos,
+                                       struct stream_saved_pos, i);
+                       stream = &saved_pos->file_stream->parent;
+                       stream_pos = &saved_pos->file_stream->pos;
+                       index = &g_array_index(stream_pos->packet_index,
+                                       struct packet_index,
+                                       saved_pos->cur_index);
+
+                       stream_pos->cur_index = saved_pos->cur_index;
+                       stream_pos->move_pos_slow(stream_pos, index->offset,
+                                       SEEK_SET);
+
+                       /*
+                        * the timestamp needs to be restored after
+                        * move_pos_slow, because this function resets
+                        * the timestamp to the beginning of the packet
+                        */
+                       stream->timestamp = saved_pos->current_timestamp;
+                       stream_pos->offset = saved_pos->offset;
+                       stream_pos->last_offset = saved_pos->offset;
+
+                       stream->prev_timestamp = 0;
+                       stream->prev_timestamp_end = 0;
+                       stream->consumed = 0;
+
+                       printf_debug("restored to cur_index = %zd and "
+                               "offset = %zd, timestamp = %" PRIu64 "\n",
+                               stream_pos->cur_index,
+                               stream_pos->offset, stream->timestamp);
+
+                       stream_read_event(saved_pos->file_stream);
+
+                       /* Add to heap */
+                       ret = heap_insert(iter->stream_heap,
+                                       saved_pos->file_stream);
+                       if (ret)
+                               goto error;
+               }
+       } else if (iter_pos->u.seek_time) {
+               /* not yet implemented */
+               return -1;
+       } else {
+               /* nowhere to seek to */
+               return -1;
+       }
+
+       return 0;
+error:
+       heap_free(iter->stream_heap);
+error_heap_init:
+       g_free(iter->stream_heap);
+       return -1;
+}
+
+struct bt_iter_pos *bt_iter_get_pos(struct bt_iter *iter)
+{
+       struct bt_iter_pos *pos;
+       struct trace_collection *tc = iter->ctx->tc;
+       int i, stream_class_id, stream_id;
+
+       pos = g_new0(struct bt_iter_pos, 1);
+       if (!pos) {
+               perror("allocating bt_iter_pos");
+               goto error;
+       }
+
+       pos->u.restore = g_new0(struct bt_saved_pos, 1);
+       if (!pos->u.restore) {
+               perror("allocating bt_saved_pos");
+               goto error;
+       }
+
+       pos->u.restore->tc = tc;
+       pos->u.restore->stream_saved_pos = g_array_new(FALSE, TRUE,
+                       sizeof(struct stream_saved_pos));
+       if (!pos->u.restore->stream_saved_pos)
+               goto error;
+
+       for (i = 0; i < tc->array->len; i++) {
+               struct ctf_trace *tin;
+               struct trace_descriptor *td_read;
+
+               td_read = g_ptr_array_index(tc->array, i);
+               tin = container_of(td_read, struct ctf_trace, parent);
+
+               for (stream_class_id = 0; stream_class_id < tin->streams->len;
+                               stream_class_id++) {
+                       struct ctf_stream_class *stream_class;
+
+                       stream_class = g_ptr_array_index(tin->streams,
+                                       stream_class_id);
+                       for (stream_id = 0;
+                                       stream_id < stream_class->streams->len;
+                                       stream_id++) {
+                               struct ctf_stream *stream;
+                               struct ctf_file_stream *cfs;
+                               struct stream_saved_pos saved_pos;
+
+                               stream = g_ptr_array_index(
+                                               stream_class->streams,
+                                               stream_id);
+                               cfs = container_of(stream,
+                                               struct ctf_file_stream,
+                                               parent);
+
+                               saved_pos.file_stream = cfs;
+                               saved_pos.cur_index = cfs->pos.cur_index;
+
+                               /*
+                                * It is possible that an event was read during
+                                * the last restore, never consumed and its
+                                * position saved again.  For this case, we
+                                * need to check if the event really was
+                                * consumed by the caller otherwise it is lost.
+                                */
+                               if (stream->consumed)
+                                       saved_pos.offset = cfs->pos.offset;
+                               else
+                                       saved_pos.offset = cfs->pos.last_offset;
+
+                               saved_pos.current_timestamp = stream->timestamp;
+
+                               g_array_append_val(
+                                       pos->u.restore->stream_saved_pos,
+                                       saved_pos);
+
+                               printf_debug("stream : %lu, cur_index : %zd, "
+                                       "offset : %zd, "
+                                       "timestamp = %" PRIu64 "\n",
+                                       stream->stream_id, saved_pos.cur_index,
+                                       saved_pos.offset,
+                                       saved_pos.current_timestamp);
+                       }
+               }
+       }
+
+       return pos;
+
+error:
+       return NULL;
+}
+
 /*
  * babeltrace_filestream_seek: seek a filestream to given position.
  *
@@ -150,7 +330,7 @@ struct bt_iter *bt_iter_create(struct bt_context *ctx,
        int ret = 0;
        struct bt_iter *iter;
 
-       iter = malloc(sizeof(struct bt_iter));
+       iter = g_new0(struct bt_iter, 1);
        if (!iter)
                goto error_malloc;
        iter->stream_heap = g_new(struct ptr_heap, 1);
@@ -215,7 +395,7 @@ error:
 error_heap_init:
        g_free(iter->stream_heap);
 error_ctx:
-       free(iter);
+       g_free(iter);
 error_malloc:
        return NULL;
 }
@@ -251,7 +431,7 @@ void bt_iter_destroy(struct bt_iter *iter)
 
        bt_context_put(iter->ctx);
 
-       free(iter);
+       g_free(iter);
 }
 
 int bt_iter_next(struct bt_iter *iter)
This page took 0.038198 seconds and 4 git commands to generate.