From: Julien Desfossez Date: Thu, 12 Dec 2013 20:32:34 +0000 (-0500) Subject: Live: let read API detect inactive streams, allow streams without fd X-Git-Tag: v1.2.0-rc1~27 X-Git-Url: http://git.efficios.com/?p=babeltrace.git;a=commitdiff_plain;h=500634bebf68d5a018f2d9c36a104b8706f59818 Live: let read API detect inactive streams, allow streams without fd In live streaming, there are cases where a stream will be inactive from beginning to end and so will never call bt_iter_next. For those cases, we have to detect when we are reading an inactive stream which happens when all other active streams have been closed. We don't want to have the same checks in two places, so this fix removes the return of EAGAIN in bt_iter_next and instead returns a NULL event with the new BT_ITER_FLAG_RETRY flag set. Also, handle cases where streams and traces have no associated file descriptors. Signed-off-by: Julien Desfossez Signed-off-by: Mathieu Desnoyers --- diff --git a/formats/ctf/ctf.c b/formats/ctf/ctf.c index 16a7ba0f..122b708f 100644 --- a/formats/ctf/ctf.c +++ b/formats/ctf/ctf.c @@ -472,12 +472,16 @@ int ctf_read_event(struct bt_stream_pos *ppos, struct ctf_stream_definition *str if (unlikely(pos->offset == EOF)) return EOF; - if (pos->content_size == 0) { - /* Stream is inactive for now (live reading). */ + /* Stream is inactive for now (live reading). */ + if (unlikely(pos->content_size == 0)) return EAGAIN; - } - /* Packet only contains headers */ - if (pos->offset == pos->content_size) + + /* + * Packet seeked to by ctf_pos_get_event() only contains + * headers, no event. Consider stream as inactive (live + * reading). + */ + if (unlikely(pos->data_offset == pos->content_size)) return EAGAIN; assert(pos->offset < pos->content_size); @@ -957,9 +961,10 @@ void ctf_packet_seek(struct bt_stream_pos *stream_pos, size_t index, int whence) pos->content_size = packet_index->content_size; pos->packet_size = packet_index->packet_size; pos->mmap_offset = packet_index->offset; - if (packet_index->data_offset < packet_index->content_size) { + pos->data_offset = packet_index->data_offset; + if (pos->data_offset < packet_index->content_size) { pos->offset = 0; /* will read headers */ - } else if (packet_index->data_offset == packet_index->content_size) { + } else if (pos->data_offset == packet_index->content_size) { /* empty packet */ pos->offset = packet_index->data_offset; whence = SEEK_CUR; @@ -2386,10 +2391,12 @@ int ctf_close_file_stream(struct ctf_file_stream *file_stream) fprintf(stderr, "Error on ctf_fini_pos\n"); return -1; } - ret = close(file_stream->pos.fd); - if (ret) { - perror("Error closing file fd"); - return -1; + if (file_stream->pos.fd >= 0) { + ret = close(file_stream->pos.fd); + if (ret) { + perror("Error closing file fd"); + return -1; + } } return 0; } @@ -2421,15 +2428,19 @@ int ctf_close_trace(struct bt_trace_descriptor *tdp) } } ctf_destroy_metadata(td); - ret = close(td->dirfd); - if (ret) { - perror("Error closing dirfd"); - return ret; + if (td->dirfd >= 0) { + ret = close(td->dirfd); + if (ret) { + perror("Error closing dirfd"); + return ret; + } } - ret = closedir(td->dir); - if (ret) { - perror("Error closedir"); - return ret; + if (td->dir) { + ret = closedir(td->dir); + if (ret) { + perror("Error closedir"); + return ret; + } } free(td->metadata_string); g_free(td); diff --git a/formats/ctf/iterator.c b/formats/ctf/iterator.c index d1152fca..69bb0f4b 100644 --- a/formats/ctf/iterator.c +++ b/formats/ctf/iterator.c @@ -119,12 +119,30 @@ struct bt_ctf_event *bt_ctf_iter_read_event_flags(struct bt_ctf_iter *iter, */ assert(iter); + if (flags) + *flags = 0; + ret = &iter->current_ctf_event; file_stream = bt_heap_maximum(iter->parent.stream_heap); if (!file_stream) { /* end of file for all streams */ goto stop; } + + /* + * If the packet is empty (contains only headers or is of size 0), the + * caller has to know that we can't read the current event and we need + * to do a bt_iter_next. + */ + if (file_stream->pos.data_offset == file_stream->pos.content_size + || file_stream->pos.content_size == 0) { + /* More events may come. */ + ret = NULL; + if (flags) + *flags |= BT_ITER_FLAG_RETRY; + goto end; + } + stream = &file_stream->parent; if (iter->parent.end_pos && iter->parent.end_pos->type == BT_SEEK_TIME && @@ -134,8 +152,6 @@ struct bt_ctf_event *bt_ctf_iter_read_event_flags(struct bt_ctf_iter *iter, ret->parent = g_ptr_array_index(stream->events_by_id, stream->event_id); - if (flags) - *flags = 0; if (!file_stream->pos.packet_cycles_index) packet_index = NULL; else diff --git a/include/babeltrace/ctf/types.h b/include/babeltrace/ctf/types.h index 06c130f8..338dad16 100644 --- a/include/babeltrace/ctf/types.h +++ b/include/babeltrace/ctf/types.h @@ -76,6 +76,7 @@ struct ctf_stream_pos { struct mmap_align *base_mma;/* mmap base address */ int64_t offset; /* offset from base, in bits. EOF for end of file. */ int64_t last_offset; /* offset before the last read_event */ + int64_t data_offset; /* offset of data in current packet */ uint64_t cur_index; /* current index in packet index */ uint64_t last_events_discarded; /* last known amount of event discarded */ void (*packet_seek)(struct bt_stream_pos *pos, size_t index, diff --git a/include/babeltrace/iterator.h b/include/babeltrace/iterator.h index 50232a90..5c3939c3 100644 --- a/include/babeltrace/iterator.h +++ b/include/babeltrace/iterator.h @@ -35,6 +35,7 @@ extern "C" { /* Flags for the iterator read_event */ enum { BT_ITER_FLAG_LOST_EVENTS = (1 << 0), + BT_ITER_FLAG_RETRY = (1 << 1), }; /* Forward declarations */ diff --git a/lib/iterator.c b/lib/iterator.c index a2a7bb57..853898d3 100644 --- a/lib/iterator.c +++ b/lib/iterator.c @@ -806,9 +806,18 @@ int bt_iter_next(struct bt_iter *iter) goto end; } else if (ret == EAGAIN) { /* - * The stream is inactive for now, we just updated the timestamp_end - * to skip over this stream up to a certain point in time. + * Live streaming: the stream is inactive for now, we + * just updated the timestamp_end to skip over this + * stream up to a certain point in time. + * + * Since we can't guarantee that a stream will ever have + * any activity, we can't rely on the fact that + * bt_iter_next will be called for each stream and deal + * with inactive streams. So instead, we return 0 here + * to the caller and let the read API handle the + * retry case. */ + ret = 0; goto reinsert; } else if (ret) { goto end; @@ -820,11 +829,6 @@ reinsert: assert(removed == file_stream); file_stream = bt_heap_maximum(iter->stream_heap); - if (file_stream->pos.content_size == 0) { - ret = EAGAIN; - } else { - ret = 0; - } end: return ret;