Fix: streaming and snapshot backward compat for relayd < 2.11
[lttng-tools.git] / src / bin / lttng-relayd / main.c
index 82e2603544c9b770ec0f60a1344f456835ddb76e..a2904eb08ae044faa2109338f4af5fe1d0c764b2 100644 (file)
@@ -37,6 +37,7 @@
 #include <inttypes.h>
 #include <urcu/futex.h>
 #include <urcu/uatomic.h>
+#include <urcu/rculist.h>
 #include <unistd.h>
 #include <fcntl.h>
 
@@ -58,7 +59,7 @@
 #include <common/config/session-config.h>
 #include <common/dynamic-buffer.h>
 #include <common/buffer-view.h>
-#include <urcu/rculist.h>
+#include <common/string-utils/format.h>
 
 #include "cmd.h"
 #include "ctf-trace.h"
@@ -1072,7 +1073,9 @@ static int relay_create_session(const struct lttcomm_relayd_hdr *recv_hdr,
        char hostname[LTTNG_HOST_NAME_MAX] = {};
        uint32_t live_timer = 0;
        bool snapshot = false;
+       bool session_name_contains_creation_timestamp = false;
        /* Left nil for peers < 2.11. */
+       char base_path[LTTNG_PATH_MAX] = {};
        lttng_uuid sessiond_uuid = {};
        LTTNG_OPTIONAL(uint64_t) id_sessiond = {};
        LTTNG_OPTIONAL(uint64_t) current_chunk_id = {};
@@ -1093,9 +1096,10 @@ static int relay_create_session(const struct lttcomm_relayd_hdr *recv_hdr,
 
                /* From 2.11 to ... */
                ret = cmd_create_session_2_11(payload, session_name, hostname,
-                               &live_timer, &snapshot, &id_sessiond_value,
+                               base_path, &live_timer, &snapshot, &id_sessiond_value,
                                sessiond_uuid, &has_current_chunk,
-                               &current_chunk_id_value, &creation_time_value);
+                               &current_chunk_id_value, &creation_time_value,
+                               &session_name_contains_creation_timestamp);
                if (lttng_uuid_is_nil(sessiond_uuid)) {
                        /* The nil UUID is reserved for pre-2.11 clients. */
                        ERR("Illegal nil UUID announced by peer in create session command");
@@ -1114,12 +1118,13 @@ static int relay_create_session(const struct lttcomm_relayd_hdr *recv_hdr,
                goto send_reply;
        }
 
-       session = session_create(session_name, hostname, live_timer,
+       session = session_create(session_name, hostname, base_path, live_timer,
                        snapshot, sessiond_uuid,
                        id_sessiond.is_set ? &id_sessiond.value : NULL,
                        current_chunk_id.is_set ? &current_chunk_id.value : NULL,
                        creation_time.is_set ? &creation_time.value : NULL,
-                       conn->major, conn->minor);
+                       conn->major, conn->minor,
+                       session_name_contains_creation_timestamp);
        if (!session) {
                ret = -1;
                goto send_reply;
@@ -2116,7 +2121,7 @@ static int relay_rotate_session_streams(
                struct relay_connection *conn,
                const struct lttng_buffer_view *payload)
 {
-       int ret;
+       int ret = 0;
        uint32_t i;
        ssize_t send_ret;
        enum lttng_error_code reply_code = LTTNG_ERR_UNK;
@@ -2127,6 +2132,8 @@ static int relay_rotate_session_streams(
        const size_t header_len = sizeof(struct lttcomm_relayd_rotate_streams);
        struct lttng_trace_chunk *next_trace_chunk = NULL;
        struct lttng_buffer_view stream_positions;
+       char chunk_id_buf[MAX_INT_DEC_LEN(uint64_t)];
+       const char *chunk_id_str = "none";
 
        if (!session || !conn->version_check_done) {
                ERR("Trying to rotate a stream before version check");
@@ -2180,8 +2187,20 @@ static int relay_rotate_session_streams(
                        ret = -1;
                        goto end;
                }
+
+               ret = snprintf(chunk_id_buf, sizeof(chunk_id_buf), "%" PRIu64,
+                               rotate_streams.new_chunk_id.value);
+               if (ret < 0 || ret >= sizeof(chunk_id_buf)) {
+                       chunk_id_str = "formatting error";
+               } else {
+                       chunk_id_str = chunk_id_buf;
+               }
        }
 
+       DBG("Rotate %" PRIu32 " streams of session \"%s\" to chunk \"%s\"",
+                       rotate_streams.stream_count, session->session_name,
+                       chunk_id_str);
+
        stream_positions = lttng_buffer_view_from_view(payload,
                        sizeof(rotate_streams), -1);
        if (!stream_positions.data ||
@@ -2237,6 +2256,7 @@ end:
                ret = -1;
        }
 
+       ret = 0;
 end_no_reply:
        lttng_trace_chunk_put(next_trace_chunk);
        return ret;
@@ -2246,29 +2266,81 @@ static int init_session_output_directory_handle(struct relay_session *session,
                struct lttng_directory_handle *handle)
 {
        int ret;
-       /* hostname/session_name */
+       /*
+        * session_directory:
+        *
+        * if base_path is \0'
+        *   hostname/session_name
+        * else
+        *   hostname/base_path
+        */
        char *session_directory = NULL;
        /*
-        * base path + session_directory
+        * relayd_output_path/session_directory
         * e.g. /home/user/lttng-traces/hostname/session_name
         */
        char *full_session_path = NULL;
-       char creation_time_str[16];
-       struct tm *timeinfo;
 
-       assert(session->creation_time.is_set);
-       timeinfo = localtime(&session->creation_time.value);
-       if (!timeinfo) {
-               ret = -1;
-               goto end;
-       }
-       strftime(creation_time_str, sizeof(creation_time_str), "%Y%m%d-%H%M%S",
-                       timeinfo);
+       /*
+        * If base path is set, it overrides the session name for the
+        * session relative base path. No timestamp is appended if the
+        * base path is overridden.
+        *
+        * If the session name already contains the creation time (e.g.
+        * auto-<timestamp>, don't append yet another timestamp after
+        * the session name in the generated path.
+        *
+        * Otherwise, generate the path with session_name-<timestamp>.
+        */
+       if (session->base_path[0] != '\0') {
+               pthread_mutex_lock(&session->lock);
+               ret = asprintf(&session_directory, "%s/%s", session->hostname,
+                               session->base_path);
+               pthread_mutex_unlock(&session->lock);
+       } else if (session->session_name_contains_creation_time) {
+               pthread_mutex_lock(&session->lock);
+               ret = asprintf(&session_directory, "%s/%s", session->hostname,
+                               session->session_name);
+               pthread_mutex_unlock(&session->lock);
+       } else {
+               char session_creation_datetime[16];
+               size_t strftime_ret;
+               struct tm *timeinfo;
+               time_t creation_time;
 
-       pthread_mutex_lock(&session->lock);
-       ret = asprintf(&session_directory, "%s/%s-%s", session->hostname,
-                       session->session_name, creation_time_str);
-       pthread_mutex_unlock(&session->lock);
+               /*
+                * The 2.11+ protocol guarantees that a creation time
+                * is provided for a session. This would indicate a
+                * protocol error or an improper use of this util.
+                */
+               if (!session->creation_time.is_set) {
+                       ERR("Creation time missing for session \"%s\" (protocol error)",
+                                       session->session_name);
+                       ret = -1;
+                       goto end;
+               }
+               creation_time = LTTNG_OPTIONAL_GET(session->creation_time);
+
+               timeinfo = localtime(&creation_time);
+               if (!timeinfo) {
+                       ERR("Failed to get timeinfo while initializing session output directory handle");
+                       ret = -1;
+                       goto end;
+               }
+               strftime_ret = strftime(session_creation_datetime,
+                               sizeof(session_creation_datetime),
+                               "%Y%m%d-%H%M%S", timeinfo);
+               if (strftime_ret == 0) {
+                       ERR("Failed to format session creation timestamp while initializing session output directory handle");
+                       ret = -1;
+                       goto end;
+               }
+               pthread_mutex_lock(&session->lock);
+               ret = asprintf(&session_directory, "%s/%s-%s",
+                               session->hostname, session->session_name,
+                               session_creation_datetime);
+               pthread_mutex_unlock(&session->lock);
+       }
        if (ret < 0) {
                PERROR("Failed to format session directory name");
                goto end;
@@ -2423,11 +2495,22 @@ static int relay_create_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr,
        }
 
        pthread_mutex_lock(&conn->session->lock);
-       lttng_trace_chunk_put(conn->session->current_trace_chunk);
+       if (conn->session->pending_closure_trace_chunk) {
+               /*
+                * Invalid; this means a second create_trace_chunk command was
+                * received before a close_trace_chunk.
+                */
+               ERR("Invalid trace chunk close command received; a trace chunk is already waiting for a trace chunk close command");
+               reply_code = LTTNG_ERR_INVALID_PROTOCOL;
+               ret = -1;
+               goto end_unlock_session;
+       }
+       conn->session->pending_closure_trace_chunk =
+                       conn->session->current_trace_chunk;
        conn->session->current_trace_chunk = published_chunk;
        published_chunk = NULL;
+end_unlock_session:
        pthread_mutex_unlock(&conn->session->lock);
-
 end:
        reply.ret_code = htobe32((uint32_t) reply_code);
        send_ret = conn->sock->ops->sendmsg(conn->sock,
@@ -2512,13 +2595,23 @@ static int relay_close_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr,
                goto end;
        }
 
+       pthread_mutex_lock(&session->lock);
+       if (session->pending_closure_trace_chunk &&
+                       session->pending_closure_trace_chunk != chunk) {
+               ERR("Trace chunk close command for session \"%s\" does not target the trace chunk pending closure",
+                               session->session_name);
+               reply_code = LTTNG_ERR_INVALID_PROTOCOL;
+               ret = -1;
+               goto end_unlock_session;
+       }
+
        chunk_status = lttng_trace_chunk_set_close_timestamp(
                        chunk, close_timestamp);
        if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
                ERR("Failed to set trace chunk close timestamp");
                ret = -1;
                reply_code = LTTNG_ERR_UNK;
-               goto end;
+               goto end_unlock_session;
        }
 
        if (close_command.is_set) {
@@ -2527,11 +2620,10 @@ static int relay_close_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr,
                if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
                        ret = -1;
                        reply_code = LTTNG_ERR_INVALID;
-                       goto end;
+                       goto end_unlock_session;
                }
        }
 
-       pthread_mutex_lock(&session->lock);
        if (session->current_trace_chunk == chunk) {
                /*
                 * After a trace chunk close command, no new streams
@@ -2544,6 +2636,9 @@ static int relay_close_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr,
                lttng_trace_chunk_put(session->current_trace_chunk);
                session->current_trace_chunk = NULL;
        }
+       lttng_trace_chunk_put(session->pending_closure_trace_chunk);
+       session->pending_closure_trace_chunk = NULL;
+end_unlock_session:
        pthread_mutex_unlock(&session->lock);
 
 end:
This page took 0.027606 seconds and 5 git commands to generate.