Live: let read API detect inactive streams, allow streams without fd
authorJulien Desfossez <jdesfossez@efficios.com>
Thu, 12 Dec 2013 20:32:34 +0000 (15:32 -0500)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Fri, 13 Dec 2013 17:43:44 +0000 (12:43 -0500)
In live streaming, there are cases where a stream will be inactive from
beginning to end and so will never call bt_iter_next. For those cases,
we have to detect when we are reading an inactive stream which happens
when all other active streams have been closed.

We don't want to have the same checks in two places, so this fix removes
the return of EAGAIN in bt_iter_next and instead returns a NULL event
with the new BT_ITER_FLAG_RETRY flag set.

Also, handle cases where streams and traces have no associated file
descriptors.

Signed-off-by: Julien Desfossez <jdesfossez@efficios.com>
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
formats/ctf/ctf.c
formats/ctf/iterator.c
include/babeltrace/ctf/types.h
include/babeltrace/iterator.h
lib/iterator.c

index 16a7ba0fd66e831b0691b17f75a812ace6964017..122b708f6e74fdfb0b6b6241442c96aca4eb9a39 100644 (file)
@@ -472,12 +472,16 @@ int ctf_read_event(struct bt_stream_pos *ppos, struct ctf_stream_definition *str
        if (unlikely(pos->offset == EOF))
                return EOF;
 
-       if (pos->content_size == 0) {
-               /* Stream is inactive for now (live reading). */
+       /* Stream is inactive for now (live reading). */
+       if (unlikely(pos->content_size == 0))
                return EAGAIN;
-       }
-       /* Packet only contains headers */
-       if (pos->offset == pos->content_size)
+
+       /*
+        * Packet seeked to by ctf_pos_get_event() only contains
+        * headers, no event. Consider stream as inactive (live
+        * reading).
+        */
+       if (unlikely(pos->data_offset == pos->content_size))
                return EAGAIN;
 
        assert(pos->offset < pos->content_size);
@@ -957,9 +961,10 @@ void ctf_packet_seek(struct bt_stream_pos *stream_pos, size_t index, int whence)
                pos->content_size = packet_index->content_size;
                pos->packet_size = packet_index->packet_size;
                pos->mmap_offset = packet_index->offset;
-               if (packet_index->data_offset < packet_index->content_size) {
+               pos->data_offset = packet_index->data_offset;
+               if (pos->data_offset < packet_index->content_size) {
                        pos->offset = 0;        /* will read headers */
-               } else if (packet_index->data_offset == packet_index->content_size) {
+               } else if (pos->data_offset == packet_index->content_size) {
                        /* empty packet */
                        pos->offset = packet_index->data_offset;
                        whence = SEEK_CUR;
@@ -2386,10 +2391,12 @@ int ctf_close_file_stream(struct ctf_file_stream *file_stream)
                fprintf(stderr, "Error on ctf_fini_pos\n");
                return -1;
        }
-       ret = close(file_stream->pos.fd);
-       if (ret) {
-               perror("Error closing file fd");
-               return -1;
+       if (file_stream->pos.fd >= 0) {
+               ret = close(file_stream->pos.fd);
+               if (ret) {
+                       perror("Error closing file fd");
+                       return -1;
+               }
        }
        return 0;
 }
@@ -2421,15 +2428,19 @@ int ctf_close_trace(struct bt_trace_descriptor *tdp)
                }
        }
        ctf_destroy_metadata(td);
-       ret = close(td->dirfd);
-       if (ret) {
-               perror("Error closing dirfd");
-               return ret;
+       if (td->dirfd >= 0) {
+               ret = close(td->dirfd);
+               if (ret) {
+                       perror("Error closing dirfd");
+                       return ret;
+               }
        }
-       ret = closedir(td->dir);
-       if (ret) {
-               perror("Error closedir");
-               return ret;
+       if (td->dir) {
+               ret = closedir(td->dir);
+               if (ret) {
+                       perror("Error closedir");
+                       return ret;
+               }
        }
        free(td->metadata_string);
        g_free(td);
index d1152fcaf86852e418d09efed280ae611606adf3..69bb0f4b7c513ff7b0bd9c8b07ad7cd9669c62f0 100644 (file)
@@ -119,12 +119,30 @@ struct bt_ctf_event *bt_ctf_iter_read_event_flags(struct bt_ctf_iter *iter,
         */
        assert(iter);
 
+       if (flags)
+               *flags = 0;
+
        ret = &iter->current_ctf_event;
        file_stream = bt_heap_maximum(iter->parent.stream_heap);
        if (!file_stream) {
                /* end of file for all streams */
                goto stop;
        }
+
+       /*
+        * If the packet is empty (contains only headers or is of size 0), the
+        * caller has to know that we can't read the current event and we need
+        * to do a bt_iter_next.
+        */
+       if (file_stream->pos.data_offset == file_stream->pos.content_size
+                       || file_stream->pos.content_size == 0) {
+               /* More events may come. */
+               ret = NULL;
+               if (flags)
+                       *flags |= BT_ITER_FLAG_RETRY;
+               goto end;
+       }
+
        stream = &file_stream->parent;
        if (iter->parent.end_pos &&
                iter->parent.end_pos->type == BT_SEEK_TIME &&
@@ -134,8 +152,6 @@ struct bt_ctf_event *bt_ctf_iter_read_event_flags(struct bt_ctf_iter *iter,
        ret->parent = g_ptr_array_index(stream->events_by_id,
                        stream->event_id);
 
-       if (flags)
-               *flags = 0;
        if (!file_stream->pos.packet_cycles_index)
                packet_index = NULL;
        else
index 06c130f80d8773a691b427479aa830020690b3ce..338dad167ec976248c23a65c16b93973b9f44518 100644 (file)
@@ -76,6 +76,7 @@ struct ctf_stream_pos {
        struct mmap_align *base_mma;/* mmap base address */
        int64_t offset;         /* offset from base, in bits. EOF for end of file. */
        int64_t last_offset;    /* offset before the last read_event */
+       int64_t data_offset;    /* offset of data in current packet */
        uint64_t cur_index;     /* current index in packet index */
        uint64_t last_events_discarded; /* last known amount of event discarded */
        void (*packet_seek)(struct bt_stream_pos *pos, size_t index,
index 50232a905fc7df90268d62c4e58c28ed9d0cd025..5c3939c32b4261eaae4ed3676a20fbb205586c73 100644 (file)
@@ -35,6 +35,7 @@ extern "C" {
 /* Flags for the iterator read_event */
 enum {
        BT_ITER_FLAG_LOST_EVENTS        = (1 << 0),
+       BT_ITER_FLAG_RETRY              = (1 << 1),
 };
 
 /* Forward declarations */
index a2a7bb578adca29313b9bda620382bfe67b8248c..853898d33e995e7004c177bff0d5782272c9898c 100644 (file)
@@ -806,9 +806,18 @@ int bt_iter_next(struct bt_iter *iter)
                goto end;
        } else if (ret == EAGAIN) {
                /*
-                * The stream is inactive for now, we just updated the timestamp_end
-                * to skip over this stream up to a certain point in time.
+                * Live streaming: the stream is inactive for now, we
+                * just updated the timestamp_end to skip over this
+                * stream up to a certain point in time.
+                *
+                * Since we can't guarantee that a stream will ever have
+                * any activity, we can't rely on the fact that
+                * bt_iter_next will be called for each stream and deal
+                * with inactive streams. So instead, we return 0 here
+                * to the caller and let the read API handle the
+                * retry case.
                 */
+               ret = 0;
                goto reinsert;
        } else if (ret) {
                goto end;
@@ -820,11 +829,6 @@ reinsert:
        assert(removed == file_stream);
 
        file_stream = bt_heap_maximum(iter->stream_heap);
-       if (file_stream->pos.content_size == 0) {
-               ret = EAGAIN;
-       } else {
-               ret = 0;
-       }
 
 end:
        return ret;
This page took 0.028852 seconds and 4 git commands to generate.