Propagate trace format to relayd on session creation
[deliverable/lttng-tools.git] / src / common / relayd / relayd.cpp
index 12658dc6856655320203e84e8e542d97130876c5..1bebc4c6a4eae8c7316697720cbb4d5de1a69b2b 100644 (file)
 #include <sys/stat.h>
 #include <inttypes.h>
 
+#include <bitset>
 #include <common/common.hpp>
-#include <common/defaults.hpp>
 #include <common/compat/endian.hpp>
 #include <common/compat/string.hpp>
-#include <common/sessiond-comm/relayd.hpp>
+#include <common/defaults.hpp>
 #include <common/index/ctf-index.hpp>
-#include <common/trace-chunk.hpp>
+#include <common/sessiond-comm/relayd.hpp>
 #include <common/string-utils/format.hpp>
+#include <common/trace-chunk.hpp>
+#include <lttng/trace-format-descriptor-internal.hpp>
 
 #include "relayd.hpp"
 
@@ -45,6 +47,16 @@ bool relayd_supports_get_configuration(const struct lttcomm_relayd_sock *sock)
        return false;
 }
 
+static bool relayd_supports_get_configuration_trace_format(const struct lttcomm_relayd_sock *sock)
+{
+       if (sock->major > 2) {
+               return true;
+       } else if (sock->major == 2 && sock->minor >= 15) {
+               return true;
+       }
+       return false;
+}
+
 /*
  * Send command. Fill up the header and append the data.
  */
@@ -68,7 +80,7 @@ static int send_command(struct lttcomm_relayd_sock *rsock,
        buf = calloc<char>(buf_size);
        if (buf == NULL) {
                PERROR("zmalloc relayd send command buf");
-               ret = -1;
+               ret = -ENOMEM;
                goto alloc_error;
        }
 
@@ -92,7 +104,6 @@ static int send_command(struct lttcomm_relayd_sock *rsock,
        if (ret < 0) {
                PERROR("Failed to send command %s of size %" PRIu64,
                                lttcomm_relayd_command_str(cmd), buf_size);
-       ret = rsock->sock.ops->sendmsg(&rsock->sock, buf, buf_size, flags);
                ret = -errno;
                goto error;
        }
@@ -134,6 +145,116 @@ error:
        return ret;
 }
 
+/*
+ * Starting from 2.15, the trace format for the session is sent to lttng-relayd
+ * on creation.
+ * */
+static int relayd_create_session_2_15(struct lttcomm_relayd_sock *rsock,
+               const char *session_name,
+               const char *hostname,
+               const char *base_path,
+               int session_live_timer,
+               unsigned int snapshot,
+               uint64_t sessiond_session_id,
+               const lttng_uuid& sessiond_uuid,
+               const uint64_t *current_chunk_id,
+               time_t creation_time,
+               bool session_name_contains_creation_time,
+               lttcomm_relayd_create_session_reply_2_15 *reply,
+               char *output_path,
+               const lttng::trace_format_descriptor& trace_format)
+{
+       int ret;
+       struct lttcomm_relayd_create_session_2_15 *msg = NULL;
+       size_t session_name_len;
+       size_t hostname_len;
+       size_t base_path_len;
+       size_t msg_length;
+       char *dst;
+
+       if (!base_path) {
+               base_path = "";
+       }
+       /* The three names are sent with a '\0' delimiter between them. */
+       session_name_len = strlen(session_name) + 1;
+       hostname_len = strlen(hostname) + 1;
+       base_path_len = strlen(base_path) + 1;
+
+       msg_length = sizeof(*msg) + session_name_len + hostname_len + base_path_len;
+       msg = zmalloc<lttcomm_relayd_create_session_2_15>(msg_length);
+       if (!msg) {
+               PERROR("zmalloc create_session_2_11 command message");
+               ret = -1;
+               goto error;
+       }
+
+       LTTNG_ASSERT(session_name_len <= UINT32_MAX);
+       msg->session_name_len = htobe32(session_name_len);
+
+       LTTNG_ASSERT(hostname_len <= UINT32_MAX);
+       msg->hostname_len = htobe32(hostname_len);
+
+       LTTNG_ASSERT(base_path_len <= UINT32_MAX);
+       msg->base_path_len = htobe32(base_path_len);
+
+       dst = msg->names;
+       if (lttng_strncpy(dst, session_name, session_name_len)) {
+               ret = -1;
+               goto error;
+       }
+       dst += session_name_len;
+       if (lttng_strncpy(dst, hostname, hostname_len)) {
+               ret = -1;
+               goto error;
+       }
+       dst += hostname_len;
+       if (lttng_strncpy(dst, base_path, base_path_len)) {
+               ret = -1;
+               goto error;
+       }
+
+       msg->live_timer = htobe32(session_live_timer);
+       msg->snapshot = !!snapshot;
+
+       std::copy(sessiond_uuid.begin(), sessiond_uuid.end(), msg->sessiond_uuid);
+       msg->session_id = htobe64(sessiond_session_id);
+       msg->session_name_contains_creation_time = session_name_contains_creation_time;
+       msg->trace_format = (uint8_t) trace_format.relayd_type();
+       if (current_chunk_id) {
+               LTTNG_OPTIONAL_SET(&msg->current_chunk_id, htobe64(*current_chunk_id));
+       }
+
+       msg->creation_time = htobe64((uint64_t) creation_time);
+
+       /* Send command */
+       ret = send_command(rsock, RELAYD_CREATE_SESSION, msg, msg_length, 0);
+       if (ret < 0) {
+               goto error;
+       }
+       /* Receive response */
+       ret = recv_reply(rsock, reply, sizeof(*reply));
+       if (ret < 0) {
+               goto error;
+       }
+       reply->generic.session_id = be64toh(reply->generic.session_id);
+       reply->generic.ret_code = be32toh(reply->generic.ret_code);
+       reply->output_path_length = be32toh(reply->output_path_length);
+       if (reply->output_path_length >= LTTNG_PATH_MAX) {
+               ERR("Invalid session output path length in reply (%" PRIu32
+                   " bytes) exceeds maximal allowed length (%d bytes)",
+                               reply->output_path_length, LTTNG_PATH_MAX);
+               ret = -1;
+               goto error;
+       }
+       ret = recv_reply(rsock, output_path, reply->output_path_length);
+       if (ret < 0) {
+               goto error;
+       }
+error:
+       free(msg);
+       return ret;
+}
+
 /*
  * Starting from 2.11, RELAYD_CREATE_SESSION payload (session_name,
  * hostname, and base_path) have no length restriction on the sender side.
@@ -313,16 +434,21 @@ error:
  */
 int relayd_create_session(struct lttcomm_relayd_sock *rsock,
                uint64_t *relayd_session_id,
-               const char *session_name, const char *hostname,
-               const char *base_path, int session_live_timer,
-               unsigned int snapshot, uint64_t sessiond_session_id,
+               const char *session_name,
+               const char *hostname,
+               const char *base_path,
+               int session_live_timer,
+               unsigned int snapshot,
+               uint64_t sessiond_session_id,
                const lttng_uuid& sessiond_uuid,
                const uint64_t *current_chunk_id,
-               time_t creation_time, bool session_name_contains_creation_time,
-               char *output_path)
+               time_t creation_time,
+               bool session_name_contains_creation_time,
+               char *output_path,
+               lttng::trace_format_descriptor& trace_format)
 {
        int ret;
-       struct lttcomm_relayd_create_session_reply_2_11 reply = {};
+       lttcomm_relayd_create_session_reply_2_15 reply = {};
 
        LTTNG_ASSERT(rsock);
        LTTNG_ASSERT(relayd_session_id);
@@ -334,17 +460,20 @@ int relayd_create_session(struct lttcomm_relayd_sock *rsock,
                ret = relayd_create_session_2_1(rsock, &reply.generic);
        } else if (rsock->minor >= 4 && rsock->minor < 11) {
                /* From 2.4 to 2.10 */
-               ret = relayd_create_session_2_4(rsock, session_name,
-                               hostname, session_live_timer, snapshot,
-                               &reply.generic);
+               ret = relayd_create_session_2_4(rsock, session_name, hostname, session_live_timer,
+                               snapshot, &reply.generic);
+       } else if (rsock->minor < 15) {
+               /* From 2.11 to 2.14 */
+               ret = relayd_create_session_2_11(rsock, session_name, hostname, base_path,
+                               session_live_timer, snapshot, sessiond_session_id, sessiond_uuid,
+                               current_chunk_id, creation_time,
+                               session_name_contains_creation_time, &reply, output_path);
        } else {
-               /* From 2.11 to ... */
-               ret = relayd_create_session_2_11(rsock, session_name,
-                               hostname, base_path, session_live_timer, snapshot,
-                               sessiond_session_id, sessiond_uuid,
+               ret = relayd_create_session_2_15(rsock, session_name, hostname, base_path,
+                               session_live_timer, snapshot, sessiond_session_id, sessiond_uuid,
                                current_chunk_id, creation_time,
-                               session_name_contains_creation_time,
-                               &reply, output_path);
+                               session_name_contains_creation_time, &reply, output_path,
+                               trace_format);
        }
 
        if (ret < 0) {
@@ -1537,53 +1666,112 @@ end:
        return ret;
 }
 
-int relayd_get_configuration(struct lttcomm_relayd_sock *sock,
+enum lttng_error_code relayd_get_configuration(struct lttcomm_relayd_sock *sock,
                uint64_t query_flags,
-               uint64_t *result_flags)
+               uint64_t& result_flags,
+               uint64_t *trace_format_query_results)
 {
-       int ret = 0;
-       struct lttcomm_relayd_get_configuration msg = (typeof(msg)) {
-               .query_flags = htobe64(query_flags),
-       };
+       int ret;
+       enum lttng_error_code ret_code = LTTNG_OK;
+       struct lttcomm_relayd_get_configuration msg = {};
        struct lttcomm_relayd_get_configuration_reply reply = {};
+       lttng_dynamic_buffer buffer;
+       bool requesting_trace_format = query_flags &
+                       LTTCOMM_RELAYD_CONFIGURATION_QUERY_FLAG_SUPPORTED_TRACE_FORMAT;
+
+       lttng_dynamic_buffer_init(&buffer);
+
+       assert(!(query_flags & ~LTTCOMM_RELAYD_CONFIGURATION_QUERY_FLAG_MASK));
+       assert(!(requesting_trace_format && !trace_format_query_results));
 
        if (!relayd_supports_get_configuration(sock)) {
                DBG("Refusing to get relayd configuration (unsupported by relayd)");
-               if (query_flags) {
-                       ret = -1;
-                       goto end;
+               result_flags = 0;
+               if (trace_format_query_results) {
+                       *trace_format_query_results =
+                                       LTTCOMM_RELAYD_CONFIGURATION_TRACE_FORMAT_SUPPORTED_CTF1;
                }
-               *result_flags = 0;
                goto end;
        }
 
-       ret = send_command(sock, RELAYD_GET_CONFIGURATION, &msg, sizeof(msg),
-                       0);
+       if (requesting_trace_format && !relayd_supports_get_configuration_trace_format(sock)) {
+               /*
+                * Provide default value for that query since lttng-relayd does
+                * not know this query type.
+                */
+               if (trace_format_query_results) {
+                       *trace_format_query_results =
+                                       LTTCOMM_RELAYD_CONFIGURATION_TRACE_FORMAT_SUPPORTED_CTF1;
+               }
+               /* Remove from the query set. */
+               query_flags &= ~LTTCOMM_RELAYD_CONFIGURATION_QUERY_FLAG_SUPPORTED_TRACE_FORMAT;
+               requesting_trace_format = false;
+       }
+
+       msg.query_flags = htobe64(query_flags);
+
+       ret = send_command(sock, RELAYD_GET_CONFIGURATION, &msg, sizeof(msg), 0);
        if (ret < 0) {
                ERR("Failed to send get configuration command to relay daemon");
+               ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL;
                goto end;
        }
 
        ret = recv_reply(sock, &reply, sizeof(reply));
        if (ret < 0) {
                ERR("Failed to receive relay daemon get configuration command reply");
+               ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL;
                goto end;
        }
 
-       reply.generic.ret_code = be32toh(reply.generic.ret_code);
-       if (reply.generic.ret_code != LTTNG_OK) {
-               ret = -1;
-               ERR("Relayd get configuration replied error %d",
-                               reply.generic.ret_code);
-       } else {
-               reply.relayd_configuration_flags =
-                       be64toh(reply.relayd_configuration_flags);
-               ret = 0;
-               DBG("Relayd successfully got configuration: query_flags = %" PRIu64
-                               ", results_flags = %" PRIu64, query_flags,
-                               reply.relayd_configuration_flags);
-               *result_flags = reply.relayd_configuration_flags;
+       ret_code = static_cast<enum lttng_error_code>(be32toh(reply.generic.ret_code));
+       if (ret_code != LTTNG_OK) {
+               ERR("Relayd get configuration replied error %d", ret_code);
+               goto end;
+       }
+
+       result_flags = be64toh(reply.relayd_configuration_flags);
+       DBG("Relayd successfully got configuration: query_flags = %" PRIu64
+           ", results_flags = %" PRIu64,
+                       query_flags, result_flags);
+
+       if (!requesting_trace_format) {
+               ret_code = LTTNG_OK;
+               goto end;
        }
+
+       /* Receive trace formats */
+       {
+               lttcomm_relayd_get_configuration_specialized_query_reply query_flag_reply = {};
+               ret = recv_reply(sock, &query_flag_reply, sizeof(query_flag_reply));
+               if (ret < 0) {
+                       ERR("Failed to receive relay daemon get configuration query flag data");
+                       ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL;
+                       goto end;
+               }
+
+               query_flag_reply.query_flag = be64toh(query_flag_reply.query_flag);
+               LTTNG_ASSERT(query_flag_reply.query_flag &
+                               LTTCOMM_RELAYD_CONFIGURATION_QUERY_FLAG_SUPPORTED_TRACE_FORMAT);
+
+               query_flag_reply.payload_len = be64toh(query_flag_reply.payload_len);
+               LTTNG_ASSERT(query_flag_reply.payload_len == sizeof(uint64_t));
+
+               lttng_dynamic_buffer_set_size(&buffer, query_flag_reply.payload_len);
+
+               ret = recv_reply(sock, buffer.data, query_flag_reply.payload_len);
+               if (ret < 0) {
+                       ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL;
+                       ERR("Failed to receive configuration dynamic payload for flag TODO");
+                       goto end;
+               }
+
+               *trace_format_query_results = be64toh(*(uint64_t *) buffer.data);
+       }
+
+       ret_code = LTTNG_OK;
+
 end:
-       return ret;
+       lttng_dynamic_buffer_reset(&buffer);
+       return ret_code;
 }
This page took 0.029797 seconds and 5 git commands to generate.