if (unlikely(pos->offset == EOF))
return EOF;
- if (pos->content_size == 0) {
- /* Stream is inactive for now (live reading). */
+ /* Stream is inactive for now (live reading). */
+ if (unlikely(pos->content_size == 0))
return EAGAIN;
- }
- /* Packet only contains headers */
- if (pos->offset == pos->content_size)
+
+ /*
+ * Packet seeked to by ctf_pos_get_event() only contains
+ * headers, no event. Consider stream as inactive (live
+ * reading).
+ */
+ if (unlikely(pos->data_offset == pos->content_size))
return EAGAIN;
assert(pos->offset < pos->content_size);
pos->content_size = packet_index->content_size;
pos->packet_size = packet_index->packet_size;
pos->mmap_offset = packet_index->offset;
- if (packet_index->data_offset < packet_index->content_size) {
+ pos->data_offset = packet_index->data_offset;
+ if (pos->data_offset < packet_index->content_size) {
pos->offset = 0; /* will read headers */
- } else if (packet_index->data_offset == packet_index->content_size) {
+ } else if (pos->data_offset == packet_index->content_size) {
/* empty packet */
pos->offset = packet_index->data_offset;
whence = SEEK_CUR;
fprintf(stderr, "Error on ctf_fini_pos\n");
return -1;
}
- ret = close(file_stream->pos.fd);
- if (ret) {
- perror("Error closing file fd");
- return -1;
+ if (file_stream->pos.fd >= 0) {
+ ret = close(file_stream->pos.fd);
+ if (ret) {
+ perror("Error closing file fd");
+ return -1;
+ }
}
return 0;
}
}
}
ctf_destroy_metadata(td);
- ret = close(td->dirfd);
- if (ret) {
- perror("Error closing dirfd");
- return ret;
+ if (td->dirfd >= 0) {
+ ret = close(td->dirfd);
+ if (ret) {
+ perror("Error closing dirfd");
+ return ret;
+ }
}
- ret = closedir(td->dir);
- if (ret) {
- perror("Error closedir");
- return ret;
+ if (td->dir) {
+ ret = closedir(td->dir);
+ if (ret) {
+ perror("Error closedir");
+ return ret;
+ }
}
free(td->metadata_string);
g_free(td);
*/
assert(iter);
+ if (flags)
+ *flags = 0;
+
ret = &iter->current_ctf_event;
file_stream = bt_heap_maximum(iter->parent.stream_heap);
if (!file_stream) {
/* end of file for all streams */
goto stop;
}
+
+ /*
+ * If the packet is empty (contains only headers or is of size 0), the
+ * caller has to know that we can't read the current event and we need
+ * to do a bt_iter_next.
+ */
+ if (file_stream->pos.data_offset == file_stream->pos.content_size
+ || file_stream->pos.content_size == 0) {
+ /* More events may come. */
+ ret = NULL;
+ if (flags)
+ *flags |= BT_ITER_FLAG_RETRY;
+ goto end;
+ }
+
stream = &file_stream->parent;
if (iter->parent.end_pos &&
iter->parent.end_pos->type == BT_SEEK_TIME &&
ret->parent = g_ptr_array_index(stream->events_by_id,
stream->event_id);
- if (flags)
- *flags = 0;
if (!file_stream->pos.packet_cycles_index)
packet_index = NULL;
else
struct mmap_align *base_mma;/* mmap base address */
int64_t offset; /* offset from base, in bits. EOF for end of file. */
int64_t last_offset; /* offset before the last read_event */
+ int64_t data_offset; /* offset of data in current packet */
uint64_t cur_index; /* current index in packet index */
uint64_t last_events_discarded; /* last known amount of event discarded */
void (*packet_seek)(struct bt_stream_pos *pos, size_t index,
/* Flags for the iterator read_event */
enum {
BT_ITER_FLAG_LOST_EVENTS = (1 << 0),
+ BT_ITER_FLAG_RETRY = (1 << 1),
};
/* Forward declarations */
goto end;
} else if (ret == EAGAIN) {
/*
- * The stream is inactive for now, we just updated the timestamp_end
- * to skip over this stream up to a certain point in time.
+ * Live streaming: the stream is inactive for now, we
+ * just updated the timestamp_end to skip over this
+ * stream up to a certain point in time.
+ *
+ * Since we can't guarantee that a stream will ever have
+ * any activity, we can't rely on the fact that
+ * bt_iter_next will be called for each stream and deal
+ * with inactive streams. So instead, we return 0 here
+ * to the caller and let the read API handle the
+ * retry case.
*/
+ ret = 0;
goto reinsert;
} else if (ret) {
goto end;
assert(removed == file_stream);
file_stream = bt_heap_maximum(iter->stream_heap);
- if (file_stream->pos.content_size == 0) {
- ret = EAGAIN;
- } else {
- ret = 0;
- }
end:
return ret;