#include <common/utils.h>
#include <common/index/index.h>
#include <common/consumer/consumer.h>
+#include <common/optional.h>
#include "ust-consumer.h"
subbuf_view = lttng_buffer_view_init(
subbuf_addr, 0, padded_len);
- read_len = lttng_consumer_on_read_subbuffer_mmap(ctx,
+ read_len = lttng_consumer_on_read_subbuffer_mmap(
stream, &subbuf_view, padded_len - len);
if (use_relayd) {
if (read_len != len) {
assert(write_len != 0);
if (write_len < 0) {
ERR("Writing one metadata packet");
- ret = -1;
+ ret = write_len;
goto end;
}
stream->ust_metadata_pushed += write_len;
struct stream_subbuffer *subbuffer)
{
int ret;
+ bool cache_empty;
+ bool got_subbuffer;
+ bool coherent;
+ bool buffer_empty;
+ unsigned long consumed_pos, produced_pos;
- ret = ustctl_get_next_subbuf(stream->ustream);
- if (ret) {
- ret = commit_one_metadata_packet(stream);
- if (ret < 0) {
- goto end;
- } else if (ret == 0) {
- /* Not an error, the cache is empty. */
- ret = -ENODATA;
- goto end;
+ do {
+ ret = ustctl_get_next_subbuf(stream->ustream);
+ if (ret == 0) {
+ got_subbuffer = true;
+ } else {
+ got_subbuffer = false;
+ if (ret != -EAGAIN) {
+ /* Fatal error. */
+ goto end;
+ }
}
- ret = ustctl_get_next_subbuf(stream->ustream);
- if (ret) {
- goto end;
+ /*
+ * Determine if the cache is empty and ensure that a sub-buffer
+ * is made available if the cache is not empty.
+ */
+ if (!got_subbuffer) {
+ ret = commit_one_metadata_packet(stream);
+ if (ret < 0 && ret != -ENOBUFS) {
+ goto end;
+ } else if (ret == 0) {
+ /* Not an error, the cache is empty. */
+ cache_empty = true;
+ ret = -ENODATA;
+ goto end;
+ } else {
+ cache_empty = false;
+ }
+ } else {
+ pthread_mutex_lock(&stream->chan->metadata_cache->lock);
+ cache_empty = stream->chan->metadata_cache->max_offset ==
+ stream->ust_metadata_pushed;
+ pthread_mutex_unlock(&stream->chan->metadata_cache->lock);
}
- }
+ } while (!got_subbuffer);
+ /* Populate sub-buffer infos and view. */
ret = get_next_subbuffer_common(stream, subbuffer);
if (ret) {
goto end;
}
+
+ ret = lttng_ustconsumer_sample_snapshot_positions(stream);
+ if (ret < 0) {
+ /*
+ * -EAGAIN is not expected since we got a sub-buffer and haven't
+ * pushed the consumption position yet (on put_next).
+ */
+ PERROR("Failed to take a snapshot of metadata buffer positions");
+ goto end;
+ }
+
+ ret = lttng_ustconsumer_get_consumed_snapshot(stream, &consumed_pos);
+ if (ret) {
+ PERROR("Failed to get metadata consumed position");
+ goto end;
+ }
+
+ ret = lttng_ustconsumer_get_produced_snapshot(stream, &produced_pos);
+ if (ret) {
+ PERROR("Failed to get metadata produced position");
+ goto end;
+ }
+
+ /* Last sub-buffer of the ring buffer ? */
+ buffer_empty = (consumed_pos + stream->max_sb_size) == produced_pos;
+
+ /*
+ * The sessiond registry lock ensures that coherent units of metadata
+ * are pushed to the consumer daemon at once. Hence, if a sub-buffer is
+ * acquired, the cache is empty, and it is the only available sub-buffer
+ * available, it is safe to assume that it is "coherent".
+ */
+ coherent = got_subbuffer && cache_empty && buffer_empty;
+
+ LTTNG_OPTIONAL_SET(&subbuffer->info.metadata.coherent, coherent);
end:
return ret;
}
return pthread_cond_broadcast(&stream->metadata_rdv) ? -errno : 0;
}
-static void lttng_ustconsumer_set_stream_ops(
+static int lttng_ustconsumer_set_stream_ops(
struct lttng_consumer_stream *stream)
{
+ int ret = 0;
+
stream->read_subbuffer_ops.on_wake_up = consumer_stream_ust_on_wake_up;
if (stream->metadata_flag) {
stream->read_subbuffer_ops.get_next_subbuffer =
extract_metadata_subbuffer_info;
stream->read_subbuffer_ops.reset_metadata =
metadata_stream_reset_cache;
- stream->read_subbuffer_ops.on_sleep = signal_metadata;
+ if (stream->chan->is_live) {
+ stream->read_subbuffer_ops.on_sleep = signal_metadata;
+ ret = consumer_stream_enable_metadata_bucketization(
+ stream);
+ if (ret) {
+ goto end;
+ }
+ }
} else {
stream->read_subbuffer_ops.get_next_subbuffer =
get_next_subbuffer;
}
stream->read_subbuffer_ops.put_next_subbuffer = put_next_subbuffer;
+end:
+ return ret;
}
/*