Propagate trace format all the way to the consumer
authorJonathan Rajotte <jonathan.rajotte-julien@efficios.com>
Thu, 18 Aug 2022 20:39:37 +0000 (16:39 -0400)
committerJonathan Rajotte <jonathan.rajotte-julien@efficios.com>
Fri, 19 Aug 2022 15:54:32 +0000 (11:54 -0400)
TODO: currently we end up propagating and exposing the trace format as a
vulgar integer on lttng-consumerd side. This is one of the frontier we
discussed IRL.

TODO: use the trace format serialize and pass a complete trace format
object on the pipe

Note that this is necessary to later decide if the consumerd strips the
ringbuffer header or not based on the ctf version. As discussed IRL, ust
could be modified to expose an API allowing us to set the ctf version thus
propagating the ctf version inside the ringbuffer header and perofming
introspection at the consumer level and split based on the value. This
was simply easier to do since we have all the data available at hand.

Signed-off-by: Jonathan Rajotte <jonathan.rajotte-julien@efficios.com>
Change-Id: I4d12692fe2cab8ad9bf22db9b1d455ec397fa843

src/bin/lttng-sessiond/consumer.cpp
src/bin/lttng-sessiond/consumer.hpp
src/bin/lttng-sessiond/kernel-consumer.cpp
src/common/consumer/consumer-stream.cpp
src/common/consumer/consumer-stream.hpp
src/common/consumer/consumer.cpp
src/common/consumer/consumer.hpp
src/common/kernel-consumer/kernel-consumer.cpp
src/common/sessiond-comm/sessiond-comm.hpp
src/common/ust-consumer/ust-consumer.cpp

index c0f8eb5894e8fae8cb070109d4820ebec62ddff2..a482dde8aff041cee9502cd7be23cb1f73f5ee43 100644 (file)
 
 #include <common/common.hpp>
 #include <common/defaults.hpp>
-#include <common/uri.hpp>
 #include <common/relayd/relayd.hpp>
 #include <common/string-utils/format.hpp>
+#include <common/uri.hpp>
+#include <lttng/trace-format-descriptor-internal.hpp>
 
 #include "consumer.hpp"
 #include "health-sessiond.hpp"
@@ -935,7 +936,8 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg,
                const char *root_shm_path,
                const char *shm_path,
                struct lttng_trace_chunk *trace_chunk,
-               const struct lttng_credentials *buffer_credentials)
+               const struct lttng_credentials *buffer_credentials,
+               const lttng::trace_format_descriptor& trace_format)
 {
        LTTNG_ASSERT(msg);
 
@@ -978,6 +980,11 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg,
        msg->u.ask_channel.monitor = monitor;
        msg->u.ask_channel.ust_app_uid = ust_app_uid;
        msg->u.ask_channel.blocking_timeout = blocking_timeout;
+       if (trace_format.type() == LTTNG_TRACE_FORMAT_DESCRIPTOR_TYPE_CTF_1) {
+               msg->u.ask_channel.trace_format = 1;
+       } else {
+               msg->u.ask_channel.trace_format = 2;
+       }
 
        std::copy(uuid.begin(), uuid.end(), msg->u.ask_channel.uuid);
 
@@ -1020,7 +1027,8 @@ void consumer_init_add_channel_comm_msg(struct lttcomm_consumer_msg *msg,
                unsigned int live_timer_interval,
                bool is_in_live_session,
                unsigned int monitor_timer_interval,
-               struct lttng_trace_chunk *trace_chunk)
+               struct lttng_trace_chunk *trace_chunk,
+               const lttng::trace_format_descriptor& trace_format)
 {
        LTTNG_ASSERT(msg);
 
@@ -1050,6 +1058,11 @@ void consumer_init_add_channel_comm_msg(struct lttcomm_consumer_msg *msg,
        msg->u.channel.live_timer_interval = live_timer_interval;
        msg->u.channel.is_live = is_in_live_session;
        msg->u.channel.monitor_timer_interval = monitor_timer_interval;
+       if (trace_format.type() == LTTNG_TRACE_FORMAT_DESCRIPTOR_TYPE_CTF_1) {
+               msg->u.channel.trace_format = 1;
+       } else {
+               msg->u.channel.trace_format = 2;
+       }
 
        strncpy(msg->u.channel.pathname, pathname,
                        sizeof(msg->u.channel.pathname));
index d76ec025f85d6773a31747b8092d562932897e6a..33b0353272443738d92d19981ad2707b36819323 100644 (file)
@@ -8,11 +8,12 @@
 #ifndef _CONSUMER_H
 #define _CONSUMER_H
 
+#include <algorithm>
 #include <common/consumer/consumer.hpp>
 #include <common/hashtable/hashtable.hpp>
 #include <lttng/lttng.h>
+#include <lttng/trace-format-descriptor-internal.hpp>
 #include <urcu/ref.h>
-#include <algorithm>
 
 #include "snapshot.hpp"
 
@@ -272,7 +273,8 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg,
                const char *root_shm_path,
                const char *shm_path,
                struct lttng_trace_chunk *trace_chunk,
-               const struct lttng_credentials *buffer_credentials);
+               const struct lttng_credentials *buffer_credentials,
+               const lttng::trace_format_descriptor& trace_format);
 void consumer_init_add_stream_comm_msg(struct lttcomm_consumer_msg *msg,
                uint64_t channel_key,
                uint64_t stream_key,
@@ -295,7 +297,8 @@ void consumer_init_add_channel_comm_msg(struct lttcomm_consumer_msg *msg,
                unsigned int live_timer_interval,
                bool is_in_live_session,
                unsigned int monitor_timer_interval,
-               struct lttng_trace_chunk *trace_chunk);
+               struct lttng_trace_chunk *trace_chunk,
+               const lttng::trace_format_descriptor& trace_format);
 int consumer_is_data_pending(uint64_t session_id,
                struct consumer_output *consumer);
 int consumer_close_metadata(struct consumer_socket *socket,
index b51b56c3e772e2e1a213853266f10b79f30591ab..72d9460498611827931e98f6cd30d097b12cd9d2 100644 (file)
@@ -141,22 +141,15 @@ int kernel_consumer_add_channel(struct consumer_socket *sock,
        }
 
        /* Prep channel message structure */
-       consumer_init_add_channel_comm_msg(&lkm,
-                       channel->key,
-                       ksession->id,
-                       &pathname[consumer_path_offset],
-                       consumer->net_seq_index,
-                       channel->channel->name,
-                       channel->stream_count,
-                       channel->channel->attr.output,
-                       CONSUMER_CHANNEL_TYPE_DATA,
+       consumer_init_add_channel_comm_msg(&lkm, channel->key, ksession->id,
+                       &pathname[consumer_path_offset], consumer->net_seq_index,
+                       channel->channel->name, channel->stream_count,
+                       channel->channel->attr.output, CONSUMER_CHANNEL_TYPE_DATA,
                        channel->channel->attr.tracefile_size,
-                       channel->channel->attr.tracefile_count,
-                       monitor,
-                       channel->channel->attr.live_timer_interval,
-                       ksession->is_live_session,
+                       channel->channel->attr.tracefile_count, monitor,
+                       channel->channel->attr.live_timer_interval, ksession->is_live_session,
                        channel_attr_extended->monitor_timer_interval,
-                       ksession->current_trace_chunk);
+                       ksession->current_trace_chunk, *ksession->trace_format);
 
        health_code_update();
 
@@ -217,22 +210,14 @@ int kernel_consumer_add_metadata(struct consumer_socket *sock,
        consumer = ksession->consumer;
 
        /* Prep channel message structure */
-       consumer_init_add_channel_comm_msg(&lkm,
-                       ksession->metadata->key,
-                       ksession->id,
-                       "",
-                       consumer->net_seq_index,
-                       ksession->metadata->conf->name,
-                       1,
-                       ksession->metadata->conf->attr.output,
-                       CONSUMER_CHANNEL_TYPE_METADATA,
+       consumer_init_add_channel_comm_msg(&lkm, ksession->metadata->key, ksession->id, "",
+                       consumer->net_seq_index, ksession->metadata->conf->name, 1,
+                       ksession->metadata->conf->attr.output, CONSUMER_CHANNEL_TYPE_METADATA,
                        ksession->metadata->conf->attr.tracefile_size,
-                       ksession->metadata->conf->attr.tracefile_count,
-                       monitor,
+                       ksession->metadata->conf->attr.tracefile_count, monitor,
                        ksession->metadata->conf->attr.live_timer_interval,
-                       ksession->is_live_session,
-                       0,
-                       ksession->current_trace_chunk);
+                       ksession->is_live_session, 0, ksession->current_trace_chunk,
+                       *ksession->trace_format);
 
        health_code_update();
 
index bb0ec0a2436db1a9fb042b29b1e53b622c2eb60e..292f9cfbd1fffcb099a346c124f4f90622d955a2 100644 (file)
@@ -638,8 +638,7 @@ end:
        return ret;
 }
 
-struct lttng_consumer_stream *consumer_stream_create(
-               struct lttng_consumer_channel *channel,
+struct lttng_consumer_stream *consumer_stream_create(struct lttng_consumer_channel *channel,
                uint64_t channel_key,
                uint64_t stream_key,
                const char *channel_name,
@@ -649,7 +648,8 @@ struct lttng_consumer_stream *consumer_stream_create(
                int cpu,
                int *alloc_ret,
                enum consumer_channel_type type,
-               unsigned int monitor)
+               unsigned int monitor,
+               int trace_format)
 {
        int ret;
        struct lttng_consumer_stream *stream;
index a8e5ddd5507b3a31ed7db90e50406a89ddab2dfe..6a43706b2a3f8c3d1dfd87bfd1b53e04c58384f6 100644 (file)
@@ -21,8 +21,7 @@ enum consumer_stream_open_packet_status {
  *
  * The channel lock MUST be acquired.
  */
-struct lttng_consumer_stream *consumer_stream_create(
-               struct lttng_consumer_channel *channel,
+struct lttng_consumer_stream *consumer_stream_create(struct lttng_consumer_channel *channel,
                uint64_t channel_key,
                uint64_t stream_key,
                const char *channel_name,
@@ -32,7 +31,8 @@ struct lttng_consumer_stream *consumer_stream_create(
                int cpu,
                int *alloc_ret,
                enum consumer_channel_type type,
-               unsigned int monitor);
+               unsigned int monitor,
+               int trace_format);
 
 /*
  * Close stream's file descriptors and, if needed, close stream also on the
index 3272c129fe353c2ad6b75f3f359391ce55ced9bf..f34006c1269b5212fdfcb7d5c89bf4ef05bf526d 100644 (file)
@@ -1022,7 +1022,8 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
                unsigned int live_timer_interval,
                bool is_in_live_session,
                const char *root_shm_path,
-               const char *shm_path)
+               const char *shm_path,
+               int trace_format)
 {
        struct lttng_consumer_channel *channel = NULL;
        struct lttng_trace_chunk *trace_chunk = NULL;
@@ -1053,6 +1054,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
        channel->monitor = monitor;
        channel->live_timer_interval = live_timer_interval;
        channel->is_live = is_in_live_session;
+       channel->trace_format = trace_format;
        pthread_mutex_init(&channel->lock, NULL);
        pthread_mutex_init(&channel->timer_lock, NULL);
 
index fb7f59bb21883ec8c8888c1069ab66020b548d45..f8ae37c5a9b1ced79d1fd3e82d0a9bf319834e5f 100644 (file)
@@ -254,6 +254,8 @@ struct lttng_consumer_channel {
 
        bool streams_sent_to_relayd;
        uint64_t last_consumed_size_sample_sent;
+
+       int trace_format;
 };
 
 struct stream_subbuffer {
@@ -973,7 +975,8 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
                unsigned int live_timer_interval,
                bool is_in_live_session,
                const char *root_shm_path,
-               const char *shm_path);
+               const char *shm_path,
+               int trace_format);
 void consumer_del_stream(struct lttng_consumer_stream *stream,
                struct lttng_ht *ht);
 void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
index 9bc66a47880d82247644ba8e1b6c4ba7c707253d..4bb2681af603856fc3bfd39dc84e3763116846c4 100644 (file)
@@ -514,17 +514,12 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key);
                new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
                                msg.u.channel.session_id,
-                               msg.u.channel.chunk_id.is_set ?
-                                               &chunk_id : NULL,
-                               msg.u.channel.pathname,
-                               msg.u.channel.name,
-                               msg.u.channel.relayd_id, msg.u.channel.output,
-                               msg.u.channel.tracefile_size,
-                               msg.u.channel.tracefile_count, 0,
-                               msg.u.channel.monitor,
-                               msg.u.channel.live_timer_interval,
-                               msg.u.channel.is_live,
-                               NULL, NULL);
+                               msg.u.channel.chunk_id.is_set ? &chunk_id : NULL,
+                               msg.u.channel.pathname, msg.u.channel.name, msg.u.channel.relayd_id,
+                               msg.u.channel.output, msg.u.channel.tracefile_size,
+                               msg.u.channel.tracefile_count, 0, msg.u.channel.monitor,
+                               msg.u.channel.live_timer_interval, msg.u.channel.is_live, NULL,
+                               NULL, msg.u.channel.trace_format);
                if (new_channel == NULL) {
                        lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
                        goto end_nosignal;
@@ -672,18 +667,10 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                health_code_update();
 
                pthread_mutex_lock(&channel->lock);
-               new_stream = consumer_stream_create(
-                               channel,
-                               channel->key,
-                               fd,
-                               channel->name,
-                               channel->relayd_id,
-                               channel->session_id,
-                               channel->trace_chunk,
-                               msg.u.stream.cpu,
-                               &alloc_ret,
-                               channel->type,
-                               channel->monitor);
+               new_stream = consumer_stream_create(channel, channel->key, fd, channel->name,
+                               channel->relayd_id, channel->session_id, channel->trace_chunk,
+                               msg.u.stream.cpu, &alloc_ret, channel->type, channel->monitor,
+                               channel->trace_format);
                if (new_stream == NULL) {
                        switch (alloc_ret) {
                        case -ENOMEM:
index 4bcc77bbbb3035ab08e7d2313e42a8538165f995..f5e55ec34e1e7557fbaf1ded64981a9799c1a44d 100644 (file)
@@ -667,6 +667,7 @@ struct lttcomm_consumer_msg {
                        uint8_t is_live;
                        /* timer to sample a channel's positions (usec). */
                        unsigned int monitor_timer_interval;
+                       int trace_format;
                } LTTNG_PACKED channel; /* Only used by Kernel. */
                struct {
                        uint64_t stream_key;
@@ -732,6 +733,7 @@ struct lttcomm_consumer_msg {
                        int64_t blocking_timeout;
                        char root_shm_path[PATH_MAX];
                        char shm_path[PATH_MAX];
+                       int trace_format;
                } LTTNG_PACKED ask_channel;
                struct {
                        uint64_t key;
index 30d1f102142781289be72a965a9b8bfe719486ff..25da16ff0ec85252e4ab0285007aeee50730f8a9 100644 (file)
@@ -97,18 +97,9 @@ static struct lttng_consumer_stream *allocate_stream(int cpu, int key,
        LTTNG_ASSERT(channel);
        LTTNG_ASSERT(ctx);
 
-       stream = consumer_stream_create(
-                       channel,
-                       channel->key,
-                       key,
-                       channel->name,
-                       channel->relayd_id,
-                       channel->session_id,
-                       channel->trace_chunk,
-                       cpu,
-                       &alloc_ret,
-                       channel->type,
-                       channel->monitor);
+       stream = consumer_stream_create(channel, channel->key, key, channel->name,
+                       channel->relayd_id, channel->session_id, channel->trace_chunk, cpu,
+                       &alloc_ret, channel->type, channel->monitor, channel->trace_format);
        if (stream == NULL) {
                switch (alloc_ret) {
                case -ENOENT:
@@ -1482,23 +1473,17 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                };
 
                /* Create a plain object and reserve a channel key. */
-               channel = consumer_allocate_channel(
-                               msg.u.ask_channel.key,
+               channel = consumer_allocate_channel(msg.u.ask_channel.key,
                                msg.u.ask_channel.session_id,
-                               msg.u.ask_channel.chunk_id.is_set ?
-                                               &chunk_id : NULL,
-                               msg.u.ask_channel.pathname,
-                               msg.u.ask_channel.name,
+                               msg.u.ask_channel.chunk_id.is_set ? &chunk_id : NULL,
+                               msg.u.ask_channel.pathname, msg.u.ask_channel.name,
                                msg.u.ask_channel.relayd_id,
                                (enum lttng_event_output) msg.u.ask_channel.output,
-                               msg.u.ask_channel.tracefile_size,
-                               msg.u.ask_channel.tracefile_count,
-                               msg.u.ask_channel.session_id_per_pid,
-                               msg.u.ask_channel.monitor,
-                               msg.u.ask_channel.live_timer_interval,
-                               msg.u.ask_channel.is_live,
-                               msg.u.ask_channel.root_shm_path,
-                               msg.u.ask_channel.shm_path);
+                               msg.u.ask_channel.tracefile_size, msg.u.ask_channel.tracefile_count,
+                               msg.u.ask_channel.session_id_per_pid, msg.u.ask_channel.monitor,
+                               msg.u.ask_channel.live_timer_interval, msg.u.ask_channel.is_live,
+                               msg.u.ask_channel.root_shm_path, msg.u.ask_channel.shm_path,
+                               msg.u.ask_channel.trace_format);
                if (!channel) {
                        goto end_channel_error;
                }
This page took 0.033489 seconds and 5 git commands to generate.