Consumer: strip ring buffer header when consuming ctf2 ring buffer packet
[lttng-tools.git] / src / common / consumer / consumer-stream.cpp
index f9807cb0173f4bf6f4a7c0e9f838afb4631c16ec..f7437947d3612d78e78ddbb86fee6ab81cb9e432 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2011 Julien Desfossez <julien.desfossez@polymtl.ca>
+ * Copyright (C) 2011 EfficiOS Inc.
  * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
  * Copyright (C) 2013 David Goulet <dgoulet@efficios.com>
  *
 #include <sys/mman.h>
 #include <unistd.h>
 
-#include <common/common.h>
-#include <common/consumer/consumer-timer.h>
-#include <common/consumer/consumer-timer.h>
-#include <common/consumer/consumer.h>
-#include <common/consumer/consumer.h>
-#include <common/consumer/metadata-bucket.h>
-#include <common/consumer/metadata-bucket.h>
-#include <common/index/index.h>
-#include <common/kernel-consumer/kernel-consumer.h>
-#include <common/kernel-ctl/kernel-ctl.h>
-#include <common/macros.h>
-#include <common/relayd/relayd.h>
-#include <common/ust-consumer/ust-consumer.h>
-#include <common/utils.h>
-
-#include "consumer-stream.h"
+#include <common/buffer-view.hpp>
+#include <common/common.hpp>
+#include <common/consumer/consumer-timer.hpp>
+#include <common/consumer/consumer.hpp>
+#include <common/consumer/metadata-bucket.hpp>
+#include <common/index/index.hpp>
+#include <common/kernel-consumer/kernel-consumer.hpp>
+#include <common/kernel-ctl/kernel-ctl.hpp>
+#include <common/macros.hpp>
+#include <common/relayd/relayd.hpp>
+#include <common/ust-consumer/ust-consumer.hpp>
+#include <common/utils.hpp>
+
+#include "consumer-stream.hpp"
+
+struct metadata_packet_header {
+       uint32_t magic; /* 0x75D11D57 */
+       uint8_t uuid[16]; /* Unique Universal Identifier */
+       uint32_t checksum; /* 0 if unused */
+       uint32_t content_size; /* in bits */
+       uint32_t packet_size; /* in bits */
+       uint8_t compression_scheme; /* 0 if unused */
+       uint8_t encryption_scheme; /* 0 if unused */
+       uint8_t checksum_scheme; /* 0 if unused */
+       uint8_t major; /* CTF spec major version number */
+       uint8_t minor; /* CTF spec minor version number */
+       uint8_t header_end[0];
+};
+
+static size_t metadata_length(void)
+{
+       return offsetof(struct metadata_packet_header, header_end);
+}
 
 /*
  * RCU call to free stream. MUST only be used with call_rcu().
@@ -35,9 +52,9 @@
 static void free_stream_rcu(struct rcu_head *head)
 {
        struct lttng_ht_node_u64 *node =
-               caa_container_of(head, struct lttng_ht_node_u64, head);
+               lttng::utils::container_of(head, &lttng_ht_node_u64::head);
        struct lttng_consumer_stream *stream =
-               caa_container_of(node, struct lttng_consumer_stream, node);
+               lttng::utils::container_of(node, &lttng_consumer_stream::node);
 
        pthread_mutex_destroy(&stream->lock);
        free(stream);
@@ -80,11 +97,12 @@ static void consumer_stream_metadata_assert_locked_all(struct lttng_consumer_str
 }
 
 /* Only used for data streams. */
-static int consumer_stream_update_stats(struct lttng_consumer_stream *stream,
-               const struct stream_subbuffer *subbuf)
+static int consumer_stream_update_stats(
+               struct lttng_consumer_stream *stream, struct stream_subbuffer *subbuf_)
 {
        int ret = 0;
        uint64_t sequence_number;
+       const stream_subbuffer *subbuf = subbuf_;
        const uint64_t discarded_events = subbuf->info.data.events_discarded;
 
        if (!subbuf->info.data.sequence_number.is_set) {
@@ -160,7 +178,7 @@ void ctf_packet_index_populate(struct ctf_packet_index *index,
 }
 
 static ssize_t consumer_stream_consume_mmap(
-               struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_local_data *ctx __attribute__((unused)),
                struct lttng_consumer_stream *stream,
                const struct stream_subbuffer *subbuffer)
 {
@@ -231,7 +249,7 @@ static ssize_t consumer_stream_consume_splice(
 static int consumer_stream_send_index(
                struct lttng_consumer_stream *stream,
                const struct stream_subbuffer *subbuffer,
-               struct lttng_consumer_local_data *ctx)
+               struct lttng_consumer_local_data *ctx __attribute__((unused)))
 {
        off_t packet_offset = 0;
        struct ctf_packet_index index = {};
@@ -469,12 +487,11 @@ end:
  * of the metadata stream in the kernel. If it was updated, set the reset flag
  * on the stream.
  */
-static
-int metadata_stream_check_version(struct lttng_consumer_stream *stream,
-       const struct stream_subbuffer *subbuffer)
+static void metadata_stream_check_version(
+               struct lttng_consumer_stream *stream, const struct stream_subbuffer *subbuffer)
 {
        if (stream->metadata_version == subbuffer->info.metadata.version) {
-               goto end;
+               return;
        }
 
        DBG("New metadata version detected");
@@ -484,8 +501,35 @@ int metadata_stream_check_version(struct lttng_consumer_stream *stream,
        if (stream->read_subbuffer_ops.reset_metadata) {
                stream->read_subbuffer_ops.reset_metadata(stream);
        }
+}
 
-end:
+static void strip_packet_header_from_subbuffer(struct stream_subbuffer *buffer)
+{
+       /*
+        * Change the view and hide the packer header and padding from the view
+        */
+       size_t new_subbuf_size = buffer->info.metadata.subbuf_size - metadata_length();
+
+       buffer->buffer.buffer = lttng_buffer_view_from_view(
+                       &buffer->buffer.buffer, metadata_length(), new_subbuf_size);
+
+       buffer->info.metadata.subbuf_size = new_subbuf_size;
+       /* Padding is not present in the view anymore */
+       buffer->info.metadata.padded_subbuf_size = new_subbuf_size;
+}
+
+static int metadata_stream_pre_consume_ctf1(
+               struct lttng_consumer_stream *stream, struct stream_subbuffer *subbuffer)
+{
+       (void) metadata_stream_check_version(stream, subbuffer);
+       return 0;
+}
+
+static int metadata_stream_pre_consume_ctf2(
+               struct lttng_consumer_stream *stream, struct stream_subbuffer *subbuffer)
+{
+       (void) metadata_stream_check_version(stream, subbuffer);
+       (void) strip_packet_header_from_subbuffer(subbuffer);
        return 0;
 }
 
@@ -591,8 +635,8 @@ end:
  */
 static
 int post_consume_open_new_packet(struct lttng_consumer_stream *stream,
-               const struct stream_subbuffer *subbuffer,
-               struct lttng_consumer_local_data *ctx)
+               const struct stream_subbuffer *subbuffer __attribute__((unused)),
+               struct lttng_consumer_local_data *ctx __attribute__((unused)))
 {
        int ret = 0;
 
@@ -638,8 +682,7 @@ end:
        return ret;
 }
 
-struct lttng_consumer_stream *consumer_stream_create(
-               struct lttng_consumer_channel *channel,
+struct lttng_consumer_stream *consumer_stream_create(struct lttng_consumer_channel *channel,
                uint64_t channel_key,
                uint64_t stream_key,
                const char *channel_name,
@@ -649,12 +692,13 @@ struct lttng_consumer_stream *consumer_stream_create(
                int cpu,
                int *alloc_ret,
                enum consumer_channel_type type,
-               unsigned int monitor)
+               unsigned int monitor,
+               int trace_format)
 {
        int ret;
        struct lttng_consumer_stream *stream;
 
-       stream = (lttng_consumer_stream *) zmalloc(sizeof(*stream));
+       stream = zmalloc<lttng_consumer_stream>();
        if (stream == NULL) {
                PERROR("malloc struct lttng_consumer_stream");
                ret = -ENOMEM;
@@ -669,6 +713,7 @@ struct lttng_consumer_stream *consumer_stream_create(
                goto error;
        }
 
+       stream->send_node = CDS_LIST_HEAD_INIT(stream->send_node);
        stream->chan = channel;
        stream->key = stream_key;
        stream->trace_chunk = trace_chunk;
@@ -746,8 +791,15 @@ struct lttng_consumer_stream *consumer_stream_create(
                                consumer_stream_metadata_unlock_all;
                stream->read_subbuffer_ops.assert_locked =
                                consumer_stream_metadata_assert_locked_all;
-               stream->read_subbuffer_ops.pre_consume_subbuffer =
-                               metadata_stream_check_version;
+               if (trace_format == 1) {
+                       stream->read_subbuffer_ops.pre_consume_subbuffer =
+                                       metadata_stream_pre_consume_ctf1;
+               } else if (trace_format == 2) {
+                       stream->read_subbuffer_ops.pre_consume_subbuffer =
+                                       metadata_stream_pre_consume_ctf2;
+               } else {
+                       abort();
+               }
        } else {
                const post_consume_cb post_consume_index_op = channel->is_live ?
                                consumer_stream_sync_metadata_index :
@@ -1060,6 +1112,8 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream,
 {
        LTTNG_ASSERT(stream);
 
+       cds_list_del_init(&stream->send_node);
+
        /* Stream is in monitor mode. */
        if (stream->monitor) {
                struct lttng_consumer_channel *free_chan = NULL;
@@ -1072,10 +1126,12 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream,
                if (stream->globally_visible) {
                        pthread_mutex_lock(&the_consumer_data.lock);
                        pthread_mutex_lock(&stream->chan->lock);
+
                        pthread_mutex_lock(&stream->lock);
                        /* Remove every reference of the stream in the consumer. */
                        consumer_stream_delete(stream, ht);
 
+
                        destroy_close_stream(stream);
 
                        /* Update channel's refcount of the stream. */
@@ -1092,6 +1148,7 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream,
                         * If the stream is not visible globally, this needs to be done
                         * outside of the consumer data lock section.
                         */
+                       destroy_close_stream(stream);
                        free_chan = unref_channel(stream);
                }
 
@@ -1270,7 +1327,7 @@ end:
 }
 
 static ssize_t metadata_bucket_consume(
-               struct lttng_consumer_local_data *unused,
+               struct lttng_consumer_local_data *unused __attribute__((unused)),
                struct lttng_consumer_stream *stream,
                const struct stream_subbuffer *subbuffer)
 {
This page took 0.029087 seconds and 5 git commands to generate.