#include <inttypes.h>
#include <unistd.h>
#include <sys/stat.h>
+#include <stdint.h>
#include <bin/lttng-consumerd/health-consumerd.h>
#include <common/common.h>
#include <common/optional.h>
#include <common/buffer-view.h>
#include <common/consumer/consumer.h>
-#include <stdint.h>
+#include <common/consumer/metadata-bucket.h>
#include "kernel-consumer.h"
subbuf_view = lttng_buffer_view_init(
subbuf_addr, 0, padded_len);
- read_len = lttng_consumer_on_read_subbuffer_mmap(ctx,
+ read_len = lttng_consumer_on_read_subbuffer_mmap(
stream, &subbuf_view,
padded_len - len);
/*
return ret;
}
+static
+int get_next_subbuffer_metadata_check(struct lttng_consumer_stream *stream,
+ struct stream_subbuffer *subbuffer)
+{
+ int ret;
+ const char *addr;
+ bool coherent;
+
+ ret = kernctl_get_next_subbuf_metadata_check(stream->wait_fd,
+ &coherent);
+ if (ret) {
+ goto end;
+ }
+
+ ret = stream->read_subbuffer_ops.extract_subbuffer_info(
+ stream, subbuffer);
+ if (ret) {
+ goto end;
+ }
+
+ LTTNG_OPTIONAL_SET(&subbuffer->info.metadata.coherent, coherent);
+
+ ret = get_current_subbuf_addr(stream, &addr);
+ if (ret) {
+ goto end;
+ }
+
+ subbuffer->buffer.buffer = lttng_buffer_view_init(
+ addr, 0, subbuffer->info.data.padded_subbuf_size);
+ DBG("Got metadata packet with padded_subbuf_size = %lu, coherent = %s",
+ subbuffer->info.metadata.padded_subbuf_size,
+ coherent ? "true" : "false");
+end:
+ return ret;
+}
+
static
int put_next_subbuffer(struct lttng_consumer_stream *stream,
struct stream_subbuffer *subbuffer)
return ret;
}
-static void lttng_kconsumer_set_stream_ops(
+static
+bool is_get_next_check_metadata_available(int tracer_fd)
+{
+ return kernctl_get_next_subbuf_metadata_check(tracer_fd, NULL) !=
+ -ENOTTY;
+}
+
+static
+int lttng_kconsumer_set_stream_ops(
struct lttng_consumer_stream *stream)
{
- if (stream->chan->output == CONSUMER_CHANNEL_MMAP) {
- stream->read_subbuffer_ops.get_next_subbuffer =
- get_next_subbuffer_mmap;
- } else {
- stream->read_subbuffer_ops.get_next_subbuffer =
- get_next_subbuffer_splice;
+ int ret = 0;
+
+ if (stream->metadata_flag && stream->chan->is_live) {
+ DBG("Attempting to enable metadata bucketization for live consumers");
+ if (is_get_next_check_metadata_available(stream->wait_fd)) {
+ DBG("Kernel tracer supports get_next_subbuffer_metadata_check, metadata will be accumulated until a coherent state is reached");
+ stream->read_subbuffer_ops.get_next_subbuffer =
+ get_next_subbuffer_metadata_check;
+ ret = consumer_stream_enable_metadata_bucketization(
+ stream);
+ if (ret) {
+ goto end;
+ }
+ } else {
+ /*
+ * The kernel tracer version is too old to indicate
+ * when the metadata stream has reached a "coherent"
+ * (parseable) point.
+ *
+ * This means that a live viewer may see an incoherent
+ * sequence of metadata and fail to parse it.
+ */
+ WARN("Kernel tracer does not support get_next_subbuffer_metadata_check which may cause live clients to fail to parse the metadata stream");
+ metadata_bucket_destroy(stream->metadata_bucket);
+ stream->metadata_bucket = NULL;
+ }
+ }
+
+ if (!stream->read_subbuffer_ops.get_next_subbuffer) {
+ if (stream->chan->output == CONSUMER_CHANNEL_MMAP) {
+ stream->read_subbuffer_ops.get_next_subbuffer =
+ get_next_subbuffer_mmap;
+ } else {
+ stream->read_subbuffer_ops.get_next_subbuffer =
+ get_next_subbuffer_splice;
+ }
}
if (stream->metadata_flag) {
}
stream->read_subbuffer_ops.put_next_subbuffer = put_next_subbuffer;
+end:
+ return ret;
}
int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
}
}
- lttng_kconsumer_set_stream_ops(stream);
+ ret = lttng_kconsumer_set_stream_ops(stream);
+ if (ret) {
+ goto error_close_fd;
+ }
/* we return 0 to let the library handle the FD internally */
return 0;