In live streaming, there are cases where a stream will be inactive from
beginning to end and so will never call bt_iter_next. For those cases,
we have to detect when we are reading an inactive stream which happens
when all other active streams have been closed.
We don't want to have the same checks in two places, so this fix removes
the return of EAGAIN in bt_iter_next and instead returns a NULL event
with the new BT_ITER_FLAG_RETRY flag set.
Also, handle cases where streams and traces have no associated file
descriptors.
Signed-off-by: Julien Desfossez <jdesfossez@efficios.com>
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
if (unlikely(pos->offset == EOF))
return EOF;
if (unlikely(pos->offset == EOF))
return EOF;
- if (pos->content_size == 0) {
- /* Stream is inactive for now (live reading). */
+ /* Stream is inactive for now (live reading). */
+ if (unlikely(pos->content_size == 0))
- }
- /* Packet only contains headers */
- if (pos->offset == pos->content_size)
+
+ /*
+ * Packet seeked to by ctf_pos_get_event() only contains
+ * headers, no event. Consider stream as inactive (live
+ * reading).
+ */
+ if (unlikely(pos->data_offset == pos->content_size))
return EAGAIN;
assert(pos->offset < pos->content_size);
return EAGAIN;
assert(pos->offset < pos->content_size);
pos->content_size = packet_index->content_size;
pos->packet_size = packet_index->packet_size;
pos->mmap_offset = packet_index->offset;
pos->content_size = packet_index->content_size;
pos->packet_size = packet_index->packet_size;
pos->mmap_offset = packet_index->offset;
- if (packet_index->data_offset < packet_index->content_size) {
+ pos->data_offset = packet_index->data_offset;
+ if (pos->data_offset < packet_index->content_size) {
pos->offset = 0; /* will read headers */
pos->offset = 0; /* will read headers */
- } else if (packet_index->data_offset == packet_index->content_size) {
+ } else if (pos->data_offset == packet_index->content_size) {
/* empty packet */
pos->offset = packet_index->data_offset;
whence = SEEK_CUR;
/* empty packet */
pos->offset = packet_index->data_offset;
whence = SEEK_CUR;
fprintf(stderr, "Error on ctf_fini_pos\n");
return -1;
}
fprintf(stderr, "Error on ctf_fini_pos\n");
return -1;
}
- ret = close(file_stream->pos.fd);
- if (ret) {
- perror("Error closing file fd");
- return -1;
+ if (file_stream->pos.fd >= 0) {
+ ret = close(file_stream->pos.fd);
+ if (ret) {
+ perror("Error closing file fd");
+ return -1;
+ }
}
}
ctf_destroy_metadata(td);
}
}
ctf_destroy_metadata(td);
- ret = close(td->dirfd);
- if (ret) {
- perror("Error closing dirfd");
- return ret;
+ if (td->dirfd >= 0) {
+ ret = close(td->dirfd);
+ if (ret) {
+ perror("Error closing dirfd");
+ return ret;
+ }
- ret = closedir(td->dir);
- if (ret) {
- perror("Error closedir");
- return ret;
+ if (td->dir) {
+ ret = closedir(td->dir);
+ if (ret) {
+ perror("Error closedir");
+ return ret;
+ }
}
free(td->metadata_string);
g_free(td);
}
free(td->metadata_string);
g_free(td);
+ if (flags)
+ *flags = 0;
+
ret = &iter->current_ctf_event;
file_stream = bt_heap_maximum(iter->parent.stream_heap);
if (!file_stream) {
/* end of file for all streams */
goto stop;
}
ret = &iter->current_ctf_event;
file_stream = bt_heap_maximum(iter->parent.stream_heap);
if (!file_stream) {
/* end of file for all streams */
goto stop;
}
+
+ /*
+ * If the packet is empty (contains only headers or is of size 0), the
+ * caller has to know that we can't read the current event and we need
+ * to do a bt_iter_next.
+ */
+ if (file_stream->pos.data_offset == file_stream->pos.content_size
+ || file_stream->pos.content_size == 0) {
+ /* More events may come. */
+ ret = NULL;
+ if (flags)
+ *flags |= BT_ITER_FLAG_RETRY;
+ goto end;
+ }
+
stream = &file_stream->parent;
if (iter->parent.end_pos &&
iter->parent.end_pos->type == BT_SEEK_TIME &&
stream = &file_stream->parent;
if (iter->parent.end_pos &&
iter->parent.end_pos->type == BT_SEEK_TIME &&
ret->parent = g_ptr_array_index(stream->events_by_id,
stream->event_id);
ret->parent = g_ptr_array_index(stream->events_by_id,
stream->event_id);
- if (flags)
- *flags = 0;
if (!file_stream->pos.packet_cycles_index)
packet_index = NULL;
else
if (!file_stream->pos.packet_cycles_index)
packet_index = NULL;
else
struct mmap_align *base_mma;/* mmap base address */
int64_t offset; /* offset from base, in bits. EOF for end of file. */
int64_t last_offset; /* offset before the last read_event */
struct mmap_align *base_mma;/* mmap base address */
int64_t offset; /* offset from base, in bits. EOF for end of file. */
int64_t last_offset; /* offset before the last read_event */
+ int64_t data_offset; /* offset of data in current packet */
uint64_t cur_index; /* current index in packet index */
uint64_t last_events_discarded; /* last known amount of event discarded */
void (*packet_seek)(struct bt_stream_pos *pos, size_t index,
uint64_t cur_index; /* current index in packet index */
uint64_t last_events_discarded; /* last known amount of event discarded */
void (*packet_seek)(struct bt_stream_pos *pos, size_t index,
/* Flags for the iterator read_event */
enum {
BT_ITER_FLAG_LOST_EVENTS = (1 << 0),
/* Flags for the iterator read_event */
enum {
BT_ITER_FLAG_LOST_EVENTS = (1 << 0),
+ BT_ITER_FLAG_RETRY = (1 << 1),
};
/* Forward declarations */
};
/* Forward declarations */
goto end;
} else if (ret == EAGAIN) {
/*
goto end;
} else if (ret == EAGAIN) {
/*
- * The stream is inactive for now, we just updated the timestamp_end
- * to skip over this stream up to a certain point in time.
+ * Live streaming: the stream is inactive for now, we
+ * just updated the timestamp_end to skip over this
+ * stream up to a certain point in time.
+ *
+ * Since we can't guarantee that a stream will ever have
+ * any activity, we can't rely on the fact that
+ * bt_iter_next will be called for each stream and deal
+ * with inactive streams. So instead, we return 0 here
+ * to the caller and let the read API handle the
+ * retry case.
goto reinsert;
} else if (ret) {
goto end;
goto reinsert;
} else if (ret) {
goto end;
assert(removed == file_stream);
file_stream = bt_heap_maximum(iter->stream_heap);
assert(removed == file_stream);
file_stream = bt_heap_maximum(iter->stream_heap);
- if (file_stream->pos.content_size == 0) {
- ret = EAGAIN;
- } else {
- ret = 0;
- }