Propagate trace format to relayd on session creation
[deliverable/lttng-tools.git] / src / bin / lttng-sessiond / consumer.cpp
index 95fc2c34875ce9fff37d3454a8c96e7de1e3a7fb..dcbcc892852b2102d839fd3a0ea78ebdb9d7098f 100644 (file)
 #include <unistd.h>
 #include <inttypes.h>
 
-#include <common/common.h>
-#include <common/defaults.h>
-#include <common/uri.h>
-#include <common/relayd/relayd.h>
-#include <common/string-utils/format.h>
-
-#include "consumer.h"
-#include "health-sessiond.h"
-#include "ust-app.h"
-#include "utils.h"
-#include "lttng-sessiond.h"
+#include <common/common.hpp>
+#include <common/defaults.hpp>
+#include <common/relayd/relayd.hpp>
+#include <common/string-utils/format.hpp>
+#include <common/uri.hpp>
+#include <lttng/trace-format-descriptor-internal.hpp>
+
+#include "consumer.hpp"
+#include "health-sessiond.hpp"
+#include "ust-app.hpp"
+#include "utils.hpp"
+#include "lttng-sessiond.hpp"
 
 /*
  * Return allocated full pathname of the session using the consumer trace path
@@ -49,7 +50,7 @@ char *setup_channel_trace_path(struct consumer_output *consumer,
         * Allocate the string ourself to make sure we never exceed
         * LTTNG_PATH_MAX.
         */
-       pathname = (char *) zmalloc(LTTNG_PATH_MAX);
+       pathname = calloc<char>(LTTNG_PATH_MAX);
        if (!pathname) {
                goto error;
        }
@@ -367,6 +368,8 @@ struct consumer_socket *consumer_find_socket_by_bitness(int bits,
        int consumer_fd;
        struct consumer_socket *socket = NULL;
 
+       ASSERT_RCU_READ_LOCKED();
+
        switch (bits) {
        case 64:
                consumer_fd = uatomic_read(&the_ust_consumerd64_fd);
@@ -401,6 +404,8 @@ struct consumer_socket *consumer_find_socket(int key,
        struct lttng_ht_node_ulong *node;
        struct consumer_socket *socket = NULL;
 
+       ASSERT_RCU_READ_LOCKED();
+
        /* Negative keys are lookup failures */
        if (key < 0 || consumer == NULL) {
                return NULL;
@@ -410,7 +415,7 @@ struct consumer_socket *consumer_find_socket(int key,
                        &iter);
        node = lttng_ht_iter_get_node_ulong(&iter);
        if (node != NULL) {
-               socket = caa_container_of(node, struct consumer_socket, node);
+               socket = lttng::utils::container_of(node, &consumer_socket::node);
        }
 
        return socket;
@@ -425,7 +430,7 @@ struct consumer_socket *consumer_allocate_socket(int *fd)
 
        LTTNG_ASSERT(fd);
 
-       socket = (consumer_socket *) zmalloc(sizeof(struct consumer_socket));
+       socket = zmalloc<consumer_socket>();
        if (socket == NULL) {
                PERROR("zmalloc consumer socket");
                goto error;
@@ -447,6 +452,7 @@ void consumer_add_socket(struct consumer_socket *sock,
 {
        LTTNG_ASSERT(sock);
        LTTNG_ASSERT(consumer);
+       ASSERT_RCU_READ_LOCKED();
 
        lttng_ht_add_unique_ulong(consumer->socks, &sock->node);
 }
@@ -463,6 +469,7 @@ void consumer_del_socket(struct consumer_socket *sock,
 
        LTTNG_ASSERT(sock);
        LTTNG_ASSERT(consumer);
+       ASSERT_RCU_READ_LOCKED();
 
        iter.iter.node = &sock->node.node;
        ret = lttng_ht_del(consumer->socks, &iter);
@@ -475,16 +482,18 @@ void consumer_del_socket(struct consumer_socket *sock,
 static void destroy_socket_rcu(struct rcu_head *head)
 {
        struct lttng_ht_node_ulong *node =
-               caa_container_of(head, struct lttng_ht_node_ulong, head);
+               lttng::utils::container_of(head, &lttng_ht_node_ulong::head);
        struct consumer_socket *socket =
-               caa_container_of(node, struct consumer_socket, node);
+               lttng::utils::container_of(node, &consumer_socket::node);
 
        free(socket);
 }
 
 /*
- * Destroy and free socket pointer in a call RCU. Read side lock must be
- * acquired before calling this function.
+ * Destroy and free socket pointer in a call RCU. The call must either:
+ *  - have acquired the read side lock before calling this function, or
+ *  - guarantee the validity of the `struct consumer_socket` object for the
+ *    duration of the call.
  */
 void consumer_destroy_socket(struct consumer_socket *sock)
 {
@@ -512,7 +521,7 @@ struct consumer_output *consumer_create_output(enum consumer_dst_type type)
 {
        struct consumer_output *output = NULL;
 
-       output = (consumer_output *) zmalloc(sizeof(struct consumer_output));
+       output = zmalloc<consumer_output>();
        if (output == NULL) {
                PERROR("zmalloc consumer_output");
                goto error;
@@ -558,7 +567,7 @@ void consumer_destroy_output_sockets(struct consumer_output *obj)
 static void consumer_release_output(struct urcu_ref *ref)
 {
        struct consumer_output *obj =
-               caa_container_of(ref, struct consumer_output, ref);
+               lttng::utils::container_of(ref, &consumer_output::ref);
 
        consumer_destroy_output_sockets(obj);
 
@@ -916,7 +925,7 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg,
                const char *name,
                uint64_t relayd_id,
                uint64_t key,
-               unsigned char *uuid,
+               const lttng_uuid& uuid,
                uint32_t chan_id,
                uint64_t tracefile_size,
                uint64_t tracefile_count,
@@ -927,7 +936,8 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg,
                const char *root_shm_path,
                const char *shm_path,
                struct lttng_trace_chunk *trace_chunk,
-               const struct lttng_credentials *buffer_credentials)
+               const struct lttng_credentials *buffer_credentials,
+               const lttng::trace_format_descriptor& trace_format)
 {
        LTTNG_ASSERT(msg);
 
@@ -970,8 +980,13 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg,
        msg->u.ask_channel.monitor = monitor;
        msg->u.ask_channel.ust_app_uid = ust_app_uid;
        msg->u.ask_channel.blocking_timeout = blocking_timeout;
+       if (trace_format.type() == LTTNG_TRACE_FORMAT_DESCRIPTOR_TYPE_CTF_1) {
+               msg->u.ask_channel.trace_format = 1;
+       } else {
+               msg->u.ask_channel.trace_format = 2;
+       }
 
-       memcpy(msg->u.ask_channel.uuid, uuid, sizeof(msg->u.ask_channel.uuid));
+       std::copy(uuid.begin(), uuid.end(), msg->u.ask_channel.uuid);
 
        if (pathname) {
                strncpy(msg->u.ask_channel.pathname, pathname,
@@ -1001,8 +1016,6 @@ void consumer_init_add_channel_comm_msg(struct lttcomm_consumer_msg *msg,
                uint64_t channel_key,
                uint64_t session_id,
                const char *pathname,
-               uid_t uid,
-               gid_t gid,
                uint64_t relayd_id,
                const char *name,
                unsigned int nb_init_streams,
@@ -1014,7 +1027,8 @@ void consumer_init_add_channel_comm_msg(struct lttcomm_consumer_msg *msg,
                unsigned int live_timer_interval,
                bool is_in_live_session,
                unsigned int monitor_timer_interval,
-               struct lttng_trace_chunk *trace_chunk)
+               struct lttng_trace_chunk *trace_chunk,
+               const lttng::trace_format_descriptor& trace_format)
 {
        LTTNG_ASSERT(msg);
 
@@ -1044,6 +1058,11 @@ void consumer_init_add_channel_comm_msg(struct lttcomm_consumer_msg *msg,
        msg->u.channel.live_timer_interval = live_timer_interval;
        msg->u.channel.is_live = is_in_live_session;
        msg->u.channel.monitor_timer_interval = monitor_timer_interval;
+       if (trace_format.type() == LTTNG_TRACE_FORMAT_DESCRIPTOR_TYPE_CTF_1) {
+               msg->u.channel.trace_format = 1;
+       } else {
+               msg->u.channel.trace_format = 2;
+       }
 
        strncpy(msg->u.channel.pathname, pathname,
                        sizeof(msg->u.channel.pathname));
@@ -1120,12 +1139,12 @@ error:
  * On success return positive value. On error, negative value.
  */
 int consumer_send_relayd_socket(struct consumer_socket *consumer_sock,
-               struct lttcomm_relayd_sock *rsock, struct consumer_output *consumer,
-               enum lttng_stream_type type, uint64_t session_id,
-               const char *session_name, const char *hostname,
-               const char *base_path, int session_live_timer,
-               const uint64_t *current_chunk_id, time_t session_creation_time,
-               bool session_name_contains_creation_time)
+               struct lttcomm_relayd_sock *rsock,
+               struct consumer_output *consumer,
+               enum lttng_stream_type type,
+               const struct ltt_session& session,
+               const char *base_path_override,
+               const uint64_t *current_chunk_id)
 {
        int ret;
        int fd;
@@ -1136,6 +1155,8 @@ int consumer_send_relayd_socket(struct consumer_socket *consumer_sock,
        LTTNG_ASSERT(consumer);
        LTTNG_ASSERT(consumer_sock);
 
+       const char *base_path = base_path_override != NULL ? base_path_override : session.base_path;
+
        memset(&msg, 0, sizeof(msg));
        /* Bail out if consumer is disabled */
        if (!consumer->enabled) {
@@ -1147,13 +1168,11 @@ int consumer_send_relayd_socket(struct consumer_socket *consumer_sock,
                char output_path[LTTNG_PATH_MAX] = {};
                uint64_t relayd_session_id;
 
-               ret = relayd_create_session(rsock, &relayd_session_id,
-                               session_name, hostname, base_path,
-                               session_live_timer, consumer->snapshot,
-                               session_id, the_sessiond_uuid, current_chunk_id,
-                               session_creation_time,
-                               session_name_contains_creation_time,
-                               output_path);
+               ret = relayd_create_session(rsock, &relayd_session_id, session.name,
+                               session.hostname, base_path, session.live_timer, consumer->snapshot,
+                               session.id, the_sessiond_uuid, current_chunk_id,
+                               session.creation_time, session.name_contains_creation_time,
+                               output_path, *session.trace_format);
                if (ret < 0) {
                        /* Close the control socket. */
                        (void) relayd_close(rsock);
@@ -1172,8 +1191,10 @@ int consumer_send_relayd_socket(struct consumer_socket *consumer_sock,
         */
        msg.u.relayd_sock.net_index = consumer->net_seq_index;
        msg.u.relayd_sock.type = type;
-       msg.u.relayd_sock.session_id = session_id;
-       memcpy(&msg.u.relayd_sock.sock, rsock, sizeof(msg.u.relayd_sock.sock));
+       msg.u.relayd_sock.session_id = session.id;
+       msg.u.relayd_sock.major = rsock->major;
+       msg.u.relayd_sock.minor = rsock->minor;
+       msg.u.relayd_sock.relayd_socket_protocol = rsock->sock.proto;
 
        DBG3("Sending relayd sock info to consumer on %d", *consumer_sock->fd_ptr);
        ret = consumer_send_msg(consumer_sock, &msg);
@@ -1451,6 +1472,7 @@ int consumer_push_metadata(struct consumer_socket *socket,
        struct lttcomm_consumer_msg msg;
 
        LTTNG_ASSERT(socket);
+       ASSERT_RCU_READ_LOCKED();
 
        DBG2("Consumer push metadata to consumer socket %d", *socket->fd_ptr);
 
@@ -1496,7 +1518,7 @@ end:
  */
 enum lttng_error_code consumer_snapshot_channel(struct consumer_socket *socket,
                uint64_t key, const struct consumer_output *output, int metadata,
-               uid_t uid, gid_t gid, const char *channel_path, int wait,
+               const char *channel_path,
                uint64_t nb_packets_per_stream)
 {
        int ret;
@@ -1676,7 +1698,7 @@ end:
  * chunk each stream is currently writing to (for the rotate_pending operation).
  */
 int consumer_rotate_channel(struct consumer_socket *socket, uint64_t key,
-               uid_t uid, gid_t gid, struct consumer_output *output,
+               struct consumer_output *output,
                bool is_metadata_channel)
 {
        int ret;
@@ -1722,6 +1744,7 @@ int consumer_open_channel_packets(struct consumer_socket *socket, uint64_t key)
        int ret;
        lttcomm_consumer_msg msg = {
                .cmd_type = LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS,
+               .u = {},
        };
        msg.u.open_channel_packets.key = key;
 
@@ -1772,17 +1795,18 @@ error_socket:
 }
 
 int consumer_init(struct consumer_socket *socket,
-               const lttng_uuid sessiond_uuid)
+               const lttng_uuid& sessiond_uuid)
 {
        int ret;
        struct lttcomm_consumer_msg msg = {
                .cmd_type = LTTNG_CONSUMER_INIT,
+               .u = {},
        };
 
        LTTNG_ASSERT(socket);
 
        DBG("Sending consumer initialization command");
-       lttng_uuid_copy(msg.u.init.sessiond_uuid, sessiond_uuid);
+       std::copy(sessiond_uuid.begin(), sessiond_uuid.end(), msg.u.init.sessiond_uuid);
 
        health_code_update();
        ret = consumer_send_msg(socket, &msg);
@@ -1821,6 +1845,7 @@ int consumer_create_trace_chunk(struct consumer_socket *socket,
        enum lttng_trace_chunk_status tc_status;
        struct lttcomm_consumer_msg msg = {
                .cmd_type = LTTNG_CONSUMER_CREATE_TRACE_CHUNK,
+               .u = {},
        };
        msg.u.create_trace_chunk.session_id = session_id;
 
@@ -1975,6 +2000,7 @@ int consumer_close_trace_chunk(struct consumer_socket *socket,
        enum lttng_trace_chunk_status chunk_status;
        lttcomm_consumer_msg msg = {
                .cmd_type = LTTNG_CONSUMER_CLOSE_TRACE_CHUNK,
+               .u = {},
        };
        msg.u.close_trace_chunk.session_id = session_id;
 
@@ -2101,6 +2127,7 @@ int consumer_trace_chunk_exists(struct consumer_socket *socket,
        enum lttng_trace_chunk_status chunk_status;
        lttcomm_consumer_msg msg = {
                .cmd_type = LTTNG_CONSUMER_TRACE_CHUNK_EXISTS,
+               .u = {},
        };
        msg.u.trace_chunk_exists.session_id = session_id;
 
This page took 0.03195 seconds and 5 git commands to generate.