Consumer: strip ring buffer header when consuming ctf2 ring buffer packet
[lttng-tools.git] / src / common / consumer / consumer-stream.cpp
index 33f97c77e66a2e074718ad8ce81b8073c3abe7e8..f7437947d3612d78e78ddbb86fee6ab81cb9e432 100644 (file)
 #include <sys/mman.h>
 #include <unistd.h>
 
+#include <common/buffer-view.hpp>
 #include <common/common.hpp>
 #include <common/consumer/consumer-timer.hpp>
-#include <common/consumer/consumer-timer.hpp>
-#include <common/consumer/consumer.hpp>
 #include <common/consumer/consumer.hpp>
 #include <common/consumer/metadata-bucket.hpp>
-#include <common/consumer/metadata-bucket.hpp>
 #include <common/index/index.hpp>
 #include <common/kernel-consumer/kernel-consumer.hpp>
 #include <common/kernel-ctl/kernel-ctl.hpp>
 
 #include "consumer-stream.hpp"
 
+struct metadata_packet_header {
+       uint32_t magic; /* 0x75D11D57 */
+       uint8_t uuid[16]; /* Unique Universal Identifier */
+       uint32_t checksum; /* 0 if unused */
+       uint32_t content_size; /* in bits */
+       uint32_t packet_size; /* in bits */
+       uint8_t compression_scheme; /* 0 if unused */
+       uint8_t encryption_scheme; /* 0 if unused */
+       uint8_t checksum_scheme; /* 0 if unused */
+       uint8_t major; /* CTF spec major version number */
+       uint8_t minor; /* CTF spec minor version number */
+       uint8_t header_end[0];
+};
+
+static size_t metadata_length(void)
+{
+       return offsetof(struct metadata_packet_header, header_end);
+}
+
 /*
  * RCU call to free stream. MUST only be used with call_rcu().
  */
@@ -80,11 +97,12 @@ static void consumer_stream_metadata_assert_locked_all(struct lttng_consumer_str
 }
 
 /* Only used for data streams. */
-static int consumer_stream_update_stats(struct lttng_consumer_stream *stream,
-               const struct stream_subbuffer *subbuf)
+static int consumer_stream_update_stats(
+               struct lttng_consumer_stream *stream, struct stream_subbuffer *subbuf_)
 {
        int ret = 0;
        uint64_t sequence_number;
+       const stream_subbuffer *subbuf = subbuf_;
        const uint64_t discarded_events = subbuf->info.data.events_discarded;
 
        if (!subbuf->info.data.sequence_number.is_set) {
@@ -469,12 +487,11 @@ end:
  * of the metadata stream in the kernel. If it was updated, set the reset flag
  * on the stream.
  */
-static
-int metadata_stream_check_version(struct lttng_consumer_stream *stream,
-       const struct stream_subbuffer *subbuffer)
+static void metadata_stream_check_version(
+               struct lttng_consumer_stream *stream, const struct stream_subbuffer *subbuffer)
 {
        if (stream->metadata_version == subbuffer->info.metadata.version) {
-               goto end;
+               return;
        }
 
        DBG("New metadata version detected");
@@ -484,8 +501,35 @@ int metadata_stream_check_version(struct lttng_consumer_stream *stream,
        if (stream->read_subbuffer_ops.reset_metadata) {
                stream->read_subbuffer_ops.reset_metadata(stream);
        }
+}
 
-end:
+static void strip_packet_header_from_subbuffer(struct stream_subbuffer *buffer)
+{
+       /*
+        * Change the view and hide the packer header and padding from the view
+        */
+       size_t new_subbuf_size = buffer->info.metadata.subbuf_size - metadata_length();
+
+       buffer->buffer.buffer = lttng_buffer_view_from_view(
+                       &buffer->buffer.buffer, metadata_length(), new_subbuf_size);
+
+       buffer->info.metadata.subbuf_size = new_subbuf_size;
+       /* Padding is not present in the view anymore */
+       buffer->info.metadata.padded_subbuf_size = new_subbuf_size;
+}
+
+static int metadata_stream_pre_consume_ctf1(
+               struct lttng_consumer_stream *stream, struct stream_subbuffer *subbuffer)
+{
+       (void) metadata_stream_check_version(stream, subbuffer);
+       return 0;
+}
+
+static int metadata_stream_pre_consume_ctf2(
+               struct lttng_consumer_stream *stream, struct stream_subbuffer *subbuffer)
+{
+       (void) metadata_stream_check_version(stream, subbuffer);
+       (void) strip_packet_header_from_subbuffer(subbuffer);
        return 0;
 }
 
@@ -638,8 +682,7 @@ end:
        return ret;
 }
 
-struct lttng_consumer_stream *consumer_stream_create(
-               struct lttng_consumer_channel *channel,
+struct lttng_consumer_stream *consumer_stream_create(struct lttng_consumer_channel *channel,
                uint64_t channel_key,
                uint64_t stream_key,
                const char *channel_name,
@@ -649,7 +692,8 @@ struct lttng_consumer_stream *consumer_stream_create(
                int cpu,
                int *alloc_ret,
                enum consumer_channel_type type,
-               unsigned int monitor)
+               unsigned int monitor,
+               int trace_format)
 {
        int ret;
        struct lttng_consumer_stream *stream;
@@ -747,8 +791,15 @@ struct lttng_consumer_stream *consumer_stream_create(
                                consumer_stream_metadata_unlock_all;
                stream->read_subbuffer_ops.assert_locked =
                                consumer_stream_metadata_assert_locked_all;
-               stream->read_subbuffer_ops.pre_consume_subbuffer =
-                               metadata_stream_check_version;
+               if (trace_format == 1) {
+                       stream->read_subbuffer_ops.pre_consume_subbuffer =
+                                       metadata_stream_pre_consume_ctf1;
+               } else if (trace_format == 2) {
+                       stream->read_subbuffer_ops.pre_consume_subbuffer =
+                                       metadata_stream_pre_consume_ctf2;
+               } else {
+                       abort();
+               }
        } else {
                const post_consume_cb post_consume_index_op = channel->is_live ?
                                consumer_stream_sync_metadata_index :
@@ -1075,10 +1126,12 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream,
                if (stream->globally_visible) {
                        pthread_mutex_lock(&the_consumer_data.lock);
                        pthread_mutex_lock(&stream->chan->lock);
+
                        pthread_mutex_lock(&stream->lock);
                        /* Remove every reference of the stream in the consumer. */
                        consumer_stream_delete(stream, ht);
 
+
                        destroy_close_stream(stream);
 
                        /* Update channel's refcount of the stream. */
This page took 0.056042 seconds and 5 git commands to generate.