Fix relayd: stream index file created in the wrong directory
[lttng-tools.git] / src / bin / lttng-relayd / main.c
index c84ebaac1c2c66b3d6f282aa6e1d69d161e16f23..cf6e01ee0ffd1409269ffc9cfc0f2dc662261d04 100644 (file)
@@ -1539,7 +1539,8 @@ end:
  * Return 0 on success, -1 on error.
  */
 static
-int create_rotate_index_file(struct relay_stream *stream)
+int create_rotate_index_file(struct relay_stream *stream,
+               const char *stream_path)
 {
        int ret;
        uint32_t major, minor;
@@ -1551,7 +1552,7 @@ int create_rotate_index_file(struct relay_stream *stream)
        }
        major = stream->trace->session->major;
        minor = stream->trace->session->minor;
-       stream->index_file = lttng_index_file_create(stream->path_name,
+       stream->index_file = lttng_index_file_create(stream_path,
                        stream->channel_name,
                        -1, -1, stream->tracefile_size,
                        tracefile_array_get_file_index_head(stream->tfa),
@@ -1587,7 +1588,7 @@ int do_rotate_stream(struct relay_stream *stream)
 
        /* Rotate also the index if the stream is not a metadata stream. */
        if (!stream->is_metadata) {
-               ret = create_rotate_index_file(stream);
+               ret = create_rotate_index_file(stream, stream->path_name);
                if (ret < 0) {
                        ERR("Failed to rotate index file");
                        goto end;
@@ -1709,7 +1710,7 @@ int rotate_truncate_stream(struct relay_stream *stream)
                goto end;
        }
 
-       ret = create_rotate_index_file(stream);
+       ret = create_rotate_index_file(stream, stream->path_name);
        if (ret < 0) {
                ERR("Rotate stream index file");
                goto end;
@@ -1746,18 +1747,27 @@ static
 int try_rotate_stream(struct relay_stream *stream)
 {
        int ret = 0;
+       uint64_t trace_seq;
 
        /* No rotation expected. */
        if (stream->rotate_at_seq_num == -1ULL) {
                goto end;
        }
 
-       if (stream->prev_seq < stream->rotate_at_seq_num ||
-                       stream->prev_seq == -1ULL) {
-               DBG("Stream %" PRIu64 " no yet ready for rotation",
-                               stream->stream_handle);
+       trace_seq = min(stream->prev_data_seq, stream->prev_index_seq);
+       if (stream->prev_data_seq == -1ULL || stream->prev_index_seq == -1ULL ||
+                       trace_seq < stream->rotate_at_seq_num) {
+               DBG("Stream %" PRIu64 " not yet ready for rotation (rotate_at_seq_num = %" PRIu64 ", prev_data_seq = %" PRIu64 ", prev_index_seq = %" PRIu64 ")",
+                               stream->stream_handle,
+                               stream->rotate_at_seq_num,
+                               stream->prev_data_seq,
+                               stream->prev_index_seq);
                goto end;
-       } else if (stream->prev_seq > stream->rotate_at_seq_num) {
+       } else if (stream->prev_data_seq > stream->rotate_at_seq_num) {
+               /*
+                * prev_data_seq is checked here since indexes and rotation
+                * commands are serialized with respect to each other.
+                */
                DBG("Rotation after too much data has been written in tracefile "
                                "for stream %" PRIu64 ", need to truncate before "
                                "rotating", stream->stream_handle);
@@ -1767,7 +1777,20 @@ int try_rotate_stream(struct relay_stream *stream)
                        goto end;
                }
        } else {
-               /* stream->prev_seq == stream->rotate_at_seq_num */
+               if (trace_seq != stream->rotate_at_seq_num) {
+                       /*
+                        * Unexpected, protocol error/bug.
+                        * It could mean that we received a rotation position
+                        * that is in the past.
+                        */
+                       ERR("Stream %" PRIu64 " is in an inconsistent state (rotate_at_seq_num = %" PRIu64 ", prev_data_seq = %" PRIu64 ", prev_index_seq = %" PRIu64 ")",
+                               stream->stream_handle,
+                               stream->rotate_at_seq_num,
+                               stream->prev_data_seq,
+                               stream->prev_index_seq);
+                       ret = -1;
+                       goto end;
+               }
                DBG("Stream %" PRIu64 " ready for rotation",
                                stream->stream_handle);
                ret = do_rotate_stream(stream);
@@ -1969,14 +1992,14 @@ static int relay_data_pending(const struct lttcomm_relayd_hdr *recv_hdr,
                 * Ensure that both the index and stream data have been
                 * flushed up to the requested point.
                 */
-               stream_seq = min(stream->prev_seq, stream->prev_index_seq);
+               stream_seq = min(stream->prev_data_seq, stream->prev_index_seq);
        } else {
-               stream_seq = stream->prev_seq;
+               stream_seq = stream->prev_data_seq;
        }
-       DBG("Data pending for stream id %" PRIu64 ": prev_seq %" PRIu64
+       DBG("Data pending for stream id %" PRIu64 ": prev_data_seq %" PRIu64
                        ", prev_index_seq %" PRIu64
                        ", and last_seq %" PRIu64, msg.stream_id,
-                       stream->prev_seq, stream->prev_index_seq,
+                       stream->prev_data_seq, stream->prev_index_seq,
                        msg.last_net_seq_num);
 
        /* Avoid wrapping issue */
@@ -2207,9 +2230,9 @@ static int relay_end_data_pending(const struct lttcomm_relayd_hdr *recv_hdr,
                                 * Ensure that both the index and stream data have been
                                 * flushed up to the requested point.
                                 */
-                               stream_seq = min(stream->prev_seq, stream->prev_index_seq);
+                               stream_seq = min(stream->prev_data_seq, stream->prev_index_seq);
                        } else {
-                               stream_seq = stream->prev_seq;
+                               stream_seq = stream->prev_data_seq;
                        }
                        if (!stream->closed || !(((int64_t) (stream_seq - stream->last_net_seq_num)) >= 0)) {
                                is_data_inflight = 1;
@@ -2504,7 +2527,8 @@ static int relay_rotate_session_stream(const struct lttcomm_relayd_hdr *recv_hdr
         * Update the trace path (just the folder, the stream name does not
         * change).
         */
-       free(stream->path_name);
+       free(stream->prev_path_name);
+       stream->prev_path_name = stream->path_name;
        stream->path_name = create_output_path(new_path_view.data);
        if (!stream->path_name) {
                ERR("Failed to create a new output path");
@@ -2628,6 +2652,7 @@ static int relay_mkdir(const struct lttcomm_relayd_hdr *recv_hdr,
                goto end;
        }
 
+       DBG("MKDIR command has path \"%s\", changed to \"%s\"", path_view.data, path);
        ret = utils_mkdir_recursive(path, S_IRWXU | S_IRWXG, -1, -1);
        if (ret < 0) {
                ERR("relay creating output directory");
@@ -2751,6 +2776,8 @@ static int relay_rotate_rename(const struct lttcomm_relayd_hdr *recv_hdr,
                goto end;
        }
 
+       DBG("ROTATE_RENAME command has argument old path = \"%s\", new_path = \"%s\"",
+                       old_path_view.data, new_path_view.data);
        complete_old_path = create_output_path(old_path_view.data);
        if (!complete_old_path) {
                ERR("Failed to build old output path in rotate_rename command");
@@ -2764,6 +2791,8 @@ static int relay_rotate_rename(const struct lttcomm_relayd_hdr *recv_hdr,
                ret = -1;
                goto end;
        }
+       DBG("Expanded ROTATE_RENAME arguments to old path = \"%s\", new_path = \"%s\"",
+                       complete_old_path, complete_new_path);
 
        ret = utils_mkdir_recursive(complete_new_path, S_IRWXU | S_IRWXG,
                        -1, -1);
@@ -3223,7 +3252,34 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
        }
 
        if (rotate_index || !stream->index_file) {
-               ret = create_rotate_index_file(stream);
+               const char *stream_path;
+
+               /*
+                * The data connection creates the stream's first index file.
+                *
+                * This can happen _after_ a ROTATE_STREAM command. In
+                * other words, the data of the first packet of this stream
+                * can be received after a ROTATE_STREAM command.
+                *
+                * The ROTATE_STREAM command changes the stream's path_name
+                * to point to the "next" chunk. If a rotation is pending for
+                * this stream, as indicated by "rotate_at_seq_num != -1ULL",
+                * it means that we are still receiving data that belongs in the
+                * stream's former path.
+                *
+                * In this very specific case, we must ensure that the index
+                * file is created in the streams's former path,
+                * "prev_path_name".
+                *
+                * All other rotations beyond the first one are not affected
+                * by this problem since the actual rotation operation creates
+                * the new chunk's index file.
+                */
+               stream_path = stream->rotate_at_seq_num == -1ULL ?
+                               stream->path_name:
+                               stream->prev_path_name;
+
+               ret = create_rotate_index_file(stream, stream_path);
                if (ret < 0) {
                        ERR("Failed to rotate index");
                        /* Put self-ref for this index due to error. */
@@ -3505,7 +3561,7 @@ static enum relay_connection_status relay_process_data_receive_payload(
        stream->tracefile_size_current += state->header.data_size +
                        state->header.padding_size;
 
-       if (stream->prev_seq == -1ULL) {
+       if (stream->prev_data_seq == -1ULL) {
                new_stream = true;
        }
        if (index_flushed) {
@@ -3514,7 +3570,7 @@ static enum relay_connection_status relay_process_data_receive_payload(
                stream->prev_index_seq = state->header.net_seq_num;
        }
 
-       stream->prev_seq = state->header.net_seq_num;
+       stream->prev_data_seq = state->header.net_seq_num;
 
        /*
         * Resetting the protocol state (to RECEIVE_HEADER) will trash the
This page took 0.027788 seconds and 5 git commands to generate.