From 8696b40b84aefe0800e9e5facb461ef4ad0fd370 Mon Sep 17 00:00:00 2001 From: Jonathan Rajotte Date: Thu, 18 Aug 2022 16:39:37 -0400 Subject: [PATCH] Propagate trace format all the way to the consumer TODO: currently we end up propagating and exposing the trace format as a vulgar integer on lttng-consumerd side. This is one of the frontier we discussed IRL. TODO: use the trace format serialize and pass a complete trace format object on the pipe Note that this is necessary to later decide if the consumerd strips the ringbuffer header or not based on the ctf version. As discussed IRL, ust could be modified to expose an API allowing us to set the ctf version thus propagating the ctf version inside the ringbuffer header and perofming introspection at the consumer level and split based on the value. This was simply easier to do since we have all the data available at hand. Signed-off-by: Jonathan Rajotte Change-Id: I4d12692fe2cab8ad9bf22db9b1d455ec397fa843 --- src/bin/lttng-sessiond/consumer.cpp | 19 +++++++-- src/bin/lttng-sessiond/consumer.hpp | 9 ++-- src/bin/lttng-sessiond/kernel-consumer.cpp | 41 ++++++------------- src/common/consumer/consumer-stream.cpp | 6 +-- src/common/consumer/consumer-stream.hpp | 6 +-- src/common/consumer/consumer.cpp | 4 +- src/common/consumer/consumer.hpp | 5 ++- .../kernel-consumer/kernel-consumer.cpp | 33 +++++---------- src/common/sessiond-comm/sessiond-comm.hpp | 2 + src/common/ust-consumer/ust-consumer.cpp | 37 +++++------------ 10 files changed, 71 insertions(+), 91 deletions(-) diff --git a/src/bin/lttng-sessiond/consumer.cpp b/src/bin/lttng-sessiond/consumer.cpp index c0f8eb589..a482dde8a 100644 --- a/src/bin/lttng-sessiond/consumer.cpp +++ b/src/bin/lttng-sessiond/consumer.cpp @@ -17,9 +17,10 @@ #include #include -#include #include #include +#include +#include #include "consumer.hpp" #include "health-sessiond.hpp" @@ -935,7 +936,8 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, const char *root_shm_path, const char *shm_path, struct lttng_trace_chunk *trace_chunk, - const struct lttng_credentials *buffer_credentials) + const struct lttng_credentials *buffer_credentials, + const lttng::trace_format_descriptor& trace_format) { LTTNG_ASSERT(msg); @@ -978,6 +980,11 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, msg->u.ask_channel.monitor = monitor; msg->u.ask_channel.ust_app_uid = ust_app_uid; msg->u.ask_channel.blocking_timeout = blocking_timeout; + if (trace_format.type() == LTTNG_TRACE_FORMAT_DESCRIPTOR_TYPE_CTF_1) { + msg->u.ask_channel.trace_format = 1; + } else { + msg->u.ask_channel.trace_format = 2; + } std::copy(uuid.begin(), uuid.end(), msg->u.ask_channel.uuid); @@ -1020,7 +1027,8 @@ void consumer_init_add_channel_comm_msg(struct lttcomm_consumer_msg *msg, unsigned int live_timer_interval, bool is_in_live_session, unsigned int monitor_timer_interval, - struct lttng_trace_chunk *trace_chunk) + struct lttng_trace_chunk *trace_chunk, + const lttng::trace_format_descriptor& trace_format) { LTTNG_ASSERT(msg); @@ -1050,6 +1058,11 @@ void consumer_init_add_channel_comm_msg(struct lttcomm_consumer_msg *msg, msg->u.channel.live_timer_interval = live_timer_interval; msg->u.channel.is_live = is_in_live_session; msg->u.channel.monitor_timer_interval = monitor_timer_interval; + if (trace_format.type() == LTTNG_TRACE_FORMAT_DESCRIPTOR_TYPE_CTF_1) { + msg->u.channel.trace_format = 1; + } else { + msg->u.channel.trace_format = 2; + } strncpy(msg->u.channel.pathname, pathname, sizeof(msg->u.channel.pathname)); diff --git a/src/bin/lttng-sessiond/consumer.hpp b/src/bin/lttng-sessiond/consumer.hpp index d76ec025f..33b035327 100644 --- a/src/bin/lttng-sessiond/consumer.hpp +++ b/src/bin/lttng-sessiond/consumer.hpp @@ -8,11 +8,12 @@ #ifndef _CONSUMER_H #define _CONSUMER_H +#include #include #include #include +#include #include -#include #include "snapshot.hpp" @@ -272,7 +273,8 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, const char *root_shm_path, const char *shm_path, struct lttng_trace_chunk *trace_chunk, - const struct lttng_credentials *buffer_credentials); + const struct lttng_credentials *buffer_credentials, + const lttng::trace_format_descriptor& trace_format); void consumer_init_add_stream_comm_msg(struct lttcomm_consumer_msg *msg, uint64_t channel_key, uint64_t stream_key, @@ -295,7 +297,8 @@ void consumer_init_add_channel_comm_msg(struct lttcomm_consumer_msg *msg, unsigned int live_timer_interval, bool is_in_live_session, unsigned int monitor_timer_interval, - struct lttng_trace_chunk *trace_chunk); + struct lttng_trace_chunk *trace_chunk, + const lttng::trace_format_descriptor& trace_format); int consumer_is_data_pending(uint64_t session_id, struct consumer_output *consumer); int consumer_close_metadata(struct consumer_socket *socket, diff --git a/src/bin/lttng-sessiond/kernel-consumer.cpp b/src/bin/lttng-sessiond/kernel-consumer.cpp index b51b56c3e..72d946049 100644 --- a/src/bin/lttng-sessiond/kernel-consumer.cpp +++ b/src/bin/lttng-sessiond/kernel-consumer.cpp @@ -141,22 +141,15 @@ int kernel_consumer_add_channel(struct consumer_socket *sock, } /* Prep channel message structure */ - consumer_init_add_channel_comm_msg(&lkm, - channel->key, - ksession->id, - &pathname[consumer_path_offset], - consumer->net_seq_index, - channel->channel->name, - channel->stream_count, - channel->channel->attr.output, - CONSUMER_CHANNEL_TYPE_DATA, + consumer_init_add_channel_comm_msg(&lkm, channel->key, ksession->id, + &pathname[consumer_path_offset], consumer->net_seq_index, + channel->channel->name, channel->stream_count, + channel->channel->attr.output, CONSUMER_CHANNEL_TYPE_DATA, channel->channel->attr.tracefile_size, - channel->channel->attr.tracefile_count, - monitor, - channel->channel->attr.live_timer_interval, - ksession->is_live_session, + channel->channel->attr.tracefile_count, monitor, + channel->channel->attr.live_timer_interval, ksession->is_live_session, channel_attr_extended->monitor_timer_interval, - ksession->current_trace_chunk); + ksession->current_trace_chunk, *ksession->trace_format); health_code_update(); @@ -217,22 +210,14 @@ int kernel_consumer_add_metadata(struct consumer_socket *sock, consumer = ksession->consumer; /* Prep channel message structure */ - consumer_init_add_channel_comm_msg(&lkm, - ksession->metadata->key, - ksession->id, - "", - consumer->net_seq_index, - ksession->metadata->conf->name, - 1, - ksession->metadata->conf->attr.output, - CONSUMER_CHANNEL_TYPE_METADATA, + consumer_init_add_channel_comm_msg(&lkm, ksession->metadata->key, ksession->id, "", + consumer->net_seq_index, ksession->metadata->conf->name, 1, + ksession->metadata->conf->attr.output, CONSUMER_CHANNEL_TYPE_METADATA, ksession->metadata->conf->attr.tracefile_size, - ksession->metadata->conf->attr.tracefile_count, - monitor, + ksession->metadata->conf->attr.tracefile_count, monitor, ksession->metadata->conf->attr.live_timer_interval, - ksession->is_live_session, - 0, - ksession->current_trace_chunk); + ksession->is_live_session, 0, ksession->current_trace_chunk, + *ksession->trace_format); health_code_update(); diff --git a/src/common/consumer/consumer-stream.cpp b/src/common/consumer/consumer-stream.cpp index bb0ec0a24..292f9cfbd 100644 --- a/src/common/consumer/consumer-stream.cpp +++ b/src/common/consumer/consumer-stream.cpp @@ -638,8 +638,7 @@ end: return ret; } -struct lttng_consumer_stream *consumer_stream_create( - struct lttng_consumer_channel *channel, +struct lttng_consumer_stream *consumer_stream_create(struct lttng_consumer_channel *channel, uint64_t channel_key, uint64_t stream_key, const char *channel_name, @@ -649,7 +648,8 @@ struct lttng_consumer_stream *consumer_stream_create( int cpu, int *alloc_ret, enum consumer_channel_type type, - unsigned int monitor) + unsigned int monitor, + int trace_format) { int ret; struct lttng_consumer_stream *stream; diff --git a/src/common/consumer/consumer-stream.hpp b/src/common/consumer/consumer-stream.hpp index a8e5ddd55..6a43706b2 100644 --- a/src/common/consumer/consumer-stream.hpp +++ b/src/common/consumer/consumer-stream.hpp @@ -21,8 +21,7 @@ enum consumer_stream_open_packet_status { * * The channel lock MUST be acquired. */ -struct lttng_consumer_stream *consumer_stream_create( - struct lttng_consumer_channel *channel, +struct lttng_consumer_stream *consumer_stream_create(struct lttng_consumer_channel *channel, uint64_t channel_key, uint64_t stream_key, const char *channel_name, @@ -32,7 +31,8 @@ struct lttng_consumer_stream *consumer_stream_create( int cpu, int *alloc_ret, enum consumer_channel_type type, - unsigned int monitor); + unsigned int monitor, + int trace_format); /* * Close stream's file descriptors and, if needed, close stream also on the diff --git a/src/common/consumer/consumer.cpp b/src/common/consumer/consumer.cpp index 3272c129f..f34006c12 100644 --- a/src/common/consumer/consumer.cpp +++ b/src/common/consumer/consumer.cpp @@ -1022,7 +1022,8 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, unsigned int live_timer_interval, bool is_in_live_session, const char *root_shm_path, - const char *shm_path) + const char *shm_path, + int trace_format) { struct lttng_consumer_channel *channel = NULL; struct lttng_trace_chunk *trace_chunk = NULL; @@ -1053,6 +1054,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, channel->monitor = monitor; channel->live_timer_interval = live_timer_interval; channel->is_live = is_in_live_session; + channel->trace_format = trace_format; pthread_mutex_init(&channel->lock, NULL); pthread_mutex_init(&channel->timer_lock, NULL); diff --git a/src/common/consumer/consumer.hpp b/src/common/consumer/consumer.hpp index fb7f59bb2..f8ae37c5a 100644 --- a/src/common/consumer/consumer.hpp +++ b/src/common/consumer/consumer.hpp @@ -254,6 +254,8 @@ struct lttng_consumer_channel { bool streams_sent_to_relayd; uint64_t last_consumed_size_sample_sent; + + int trace_format; }; struct stream_subbuffer { @@ -973,7 +975,8 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, unsigned int live_timer_interval, bool is_in_live_session, const char *root_shm_path, - const char *shm_path); + const char *shm_path, + int trace_format); void consumer_del_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht); void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, diff --git a/src/common/kernel-consumer/kernel-consumer.cpp b/src/common/kernel-consumer/kernel-consumer.cpp index 9bc66a478..4bb2681af 100644 --- a/src/common/kernel-consumer/kernel-consumer.cpp +++ b/src/common/kernel-consumer/kernel-consumer.cpp @@ -514,17 +514,12 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key); new_channel = consumer_allocate_channel(msg.u.channel.channel_key, msg.u.channel.session_id, - msg.u.channel.chunk_id.is_set ? - &chunk_id : NULL, - msg.u.channel.pathname, - msg.u.channel.name, - msg.u.channel.relayd_id, msg.u.channel.output, - msg.u.channel.tracefile_size, - msg.u.channel.tracefile_count, 0, - msg.u.channel.monitor, - msg.u.channel.live_timer_interval, - msg.u.channel.is_live, - NULL, NULL); + msg.u.channel.chunk_id.is_set ? &chunk_id : NULL, + msg.u.channel.pathname, msg.u.channel.name, msg.u.channel.relayd_id, + msg.u.channel.output, msg.u.channel.tracefile_size, + msg.u.channel.tracefile_count, 0, msg.u.channel.monitor, + msg.u.channel.live_timer_interval, msg.u.channel.is_live, NULL, + NULL, msg.u.channel.trace_format); if (new_channel == NULL) { lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); goto end_nosignal; @@ -672,18 +667,10 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, health_code_update(); pthread_mutex_lock(&channel->lock); - new_stream = consumer_stream_create( - channel, - channel->key, - fd, - channel->name, - channel->relayd_id, - channel->session_id, - channel->trace_chunk, - msg.u.stream.cpu, - &alloc_ret, - channel->type, - channel->monitor); + new_stream = consumer_stream_create(channel, channel->key, fd, channel->name, + channel->relayd_id, channel->session_id, channel->trace_chunk, + msg.u.stream.cpu, &alloc_ret, channel->type, channel->monitor, + channel->trace_format); if (new_stream == NULL) { switch (alloc_ret) { case -ENOMEM: diff --git a/src/common/sessiond-comm/sessiond-comm.hpp b/src/common/sessiond-comm/sessiond-comm.hpp index 4bcc77bbb..f5e55ec34 100644 --- a/src/common/sessiond-comm/sessiond-comm.hpp +++ b/src/common/sessiond-comm/sessiond-comm.hpp @@ -667,6 +667,7 @@ struct lttcomm_consumer_msg { uint8_t is_live; /* timer to sample a channel's positions (usec). */ unsigned int monitor_timer_interval; + int trace_format; } LTTNG_PACKED channel; /* Only used by Kernel. */ struct { uint64_t stream_key; @@ -732,6 +733,7 @@ struct lttcomm_consumer_msg { int64_t blocking_timeout; char root_shm_path[PATH_MAX]; char shm_path[PATH_MAX]; + int trace_format; } LTTNG_PACKED ask_channel; struct { uint64_t key; diff --git a/src/common/ust-consumer/ust-consumer.cpp b/src/common/ust-consumer/ust-consumer.cpp index 30d1f1021..25da16ff0 100644 --- a/src/common/ust-consumer/ust-consumer.cpp +++ b/src/common/ust-consumer/ust-consumer.cpp @@ -97,18 +97,9 @@ static struct lttng_consumer_stream *allocate_stream(int cpu, int key, LTTNG_ASSERT(channel); LTTNG_ASSERT(ctx); - stream = consumer_stream_create( - channel, - channel->key, - key, - channel->name, - channel->relayd_id, - channel->session_id, - channel->trace_chunk, - cpu, - &alloc_ret, - channel->type, - channel->monitor); + stream = consumer_stream_create(channel, channel->key, key, channel->name, + channel->relayd_id, channel->session_id, channel->trace_chunk, cpu, + &alloc_ret, channel->type, channel->monitor, channel->trace_format); if (stream == NULL) { switch (alloc_ret) { case -ENOENT: @@ -1482,23 +1473,17 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, }; /* Create a plain object and reserve a channel key. */ - channel = consumer_allocate_channel( - msg.u.ask_channel.key, + channel = consumer_allocate_channel(msg.u.ask_channel.key, msg.u.ask_channel.session_id, - msg.u.ask_channel.chunk_id.is_set ? - &chunk_id : NULL, - msg.u.ask_channel.pathname, - msg.u.ask_channel.name, + msg.u.ask_channel.chunk_id.is_set ? &chunk_id : NULL, + msg.u.ask_channel.pathname, msg.u.ask_channel.name, msg.u.ask_channel.relayd_id, (enum lttng_event_output) msg.u.ask_channel.output, - msg.u.ask_channel.tracefile_size, - msg.u.ask_channel.tracefile_count, - msg.u.ask_channel.session_id_per_pid, - msg.u.ask_channel.monitor, - msg.u.ask_channel.live_timer_interval, - msg.u.ask_channel.is_live, - msg.u.ask_channel.root_shm_path, - msg.u.ask_channel.shm_path); + msg.u.ask_channel.tracefile_size, msg.u.ask_channel.tracefile_count, + msg.u.ask_channel.session_id_per_pid, msg.u.ask_channel.monitor, + msg.u.ask_channel.live_timer_interval, msg.u.ask_channel.is_live, + msg.u.ask_channel.root_shm_path, msg.u.ask_channel.shm_path, + msg.u.ask_channel.trace_format); if (!channel) { goto end_channel_error; } -- 2.34.1