X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=cf6e01ee0ffd1409269ffc9cfc0f2dc662261d04;hp=c84ebaac1c2c66b3d6f282aa6e1d69d161e16f23;hb=cb523e0290a439cf57fa7823ffa78803500ba4c3;hpb=298a25cae73bda8d8ad6fa986f7d0b822645559b diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index c84ebaac1..cf6e01ee0 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -1539,7 +1539,8 @@ end: * Return 0 on success, -1 on error. */ static -int create_rotate_index_file(struct relay_stream *stream) +int create_rotate_index_file(struct relay_stream *stream, + const char *stream_path) { int ret; uint32_t major, minor; @@ -1551,7 +1552,7 @@ int create_rotate_index_file(struct relay_stream *stream) } major = stream->trace->session->major; minor = stream->trace->session->minor; - stream->index_file = lttng_index_file_create(stream->path_name, + stream->index_file = lttng_index_file_create(stream_path, stream->channel_name, -1, -1, stream->tracefile_size, tracefile_array_get_file_index_head(stream->tfa), @@ -1587,7 +1588,7 @@ int do_rotate_stream(struct relay_stream *stream) /* Rotate also the index if the stream is not a metadata stream. */ if (!stream->is_metadata) { - ret = create_rotate_index_file(stream); + ret = create_rotate_index_file(stream, stream->path_name); if (ret < 0) { ERR("Failed to rotate index file"); goto end; @@ -1709,7 +1710,7 @@ int rotate_truncate_stream(struct relay_stream *stream) goto end; } - ret = create_rotate_index_file(stream); + ret = create_rotate_index_file(stream, stream->path_name); if (ret < 0) { ERR("Rotate stream index file"); goto end; @@ -1746,18 +1747,27 @@ static int try_rotate_stream(struct relay_stream *stream) { int ret = 0; + uint64_t trace_seq; /* No rotation expected. */ if (stream->rotate_at_seq_num == -1ULL) { goto end; } - if (stream->prev_seq < stream->rotate_at_seq_num || - stream->prev_seq == -1ULL) { - DBG("Stream %" PRIu64 " no yet ready for rotation", - stream->stream_handle); + trace_seq = min(stream->prev_data_seq, stream->prev_index_seq); + if (stream->prev_data_seq == -1ULL || stream->prev_index_seq == -1ULL || + trace_seq < stream->rotate_at_seq_num) { + DBG("Stream %" PRIu64 " not yet ready for rotation (rotate_at_seq_num = %" PRIu64 ", prev_data_seq = %" PRIu64 ", prev_index_seq = %" PRIu64 ")", + stream->stream_handle, + stream->rotate_at_seq_num, + stream->prev_data_seq, + stream->prev_index_seq); goto end; - } else if (stream->prev_seq > stream->rotate_at_seq_num) { + } else if (stream->prev_data_seq > stream->rotate_at_seq_num) { + /* + * prev_data_seq is checked here since indexes and rotation + * commands are serialized with respect to each other. + */ DBG("Rotation after too much data has been written in tracefile " "for stream %" PRIu64 ", need to truncate before " "rotating", stream->stream_handle); @@ -1767,7 +1777,20 @@ int try_rotate_stream(struct relay_stream *stream) goto end; } } else { - /* stream->prev_seq == stream->rotate_at_seq_num */ + if (trace_seq != stream->rotate_at_seq_num) { + /* + * Unexpected, protocol error/bug. + * It could mean that we received a rotation position + * that is in the past. + */ + ERR("Stream %" PRIu64 " is in an inconsistent state (rotate_at_seq_num = %" PRIu64 ", prev_data_seq = %" PRIu64 ", prev_index_seq = %" PRIu64 ")", + stream->stream_handle, + stream->rotate_at_seq_num, + stream->prev_data_seq, + stream->prev_index_seq); + ret = -1; + goto end; + } DBG("Stream %" PRIu64 " ready for rotation", stream->stream_handle); ret = do_rotate_stream(stream); @@ -1969,14 +1992,14 @@ static int relay_data_pending(const struct lttcomm_relayd_hdr *recv_hdr, * Ensure that both the index and stream data have been * flushed up to the requested point. */ - stream_seq = min(stream->prev_seq, stream->prev_index_seq); + stream_seq = min(stream->prev_data_seq, stream->prev_index_seq); } else { - stream_seq = stream->prev_seq; + stream_seq = stream->prev_data_seq; } - DBG("Data pending for stream id %" PRIu64 ": prev_seq %" PRIu64 + DBG("Data pending for stream id %" PRIu64 ": prev_data_seq %" PRIu64 ", prev_index_seq %" PRIu64 ", and last_seq %" PRIu64, msg.stream_id, - stream->prev_seq, stream->prev_index_seq, + stream->prev_data_seq, stream->prev_index_seq, msg.last_net_seq_num); /* Avoid wrapping issue */ @@ -2207,9 +2230,9 @@ static int relay_end_data_pending(const struct lttcomm_relayd_hdr *recv_hdr, * Ensure that both the index and stream data have been * flushed up to the requested point. */ - stream_seq = min(stream->prev_seq, stream->prev_index_seq); + stream_seq = min(stream->prev_data_seq, stream->prev_index_seq); } else { - stream_seq = stream->prev_seq; + stream_seq = stream->prev_data_seq; } if (!stream->closed || !(((int64_t) (stream_seq - stream->last_net_seq_num)) >= 0)) { is_data_inflight = 1; @@ -2504,7 +2527,8 @@ static int relay_rotate_session_stream(const struct lttcomm_relayd_hdr *recv_hdr * Update the trace path (just the folder, the stream name does not * change). */ - free(stream->path_name); + free(stream->prev_path_name); + stream->prev_path_name = stream->path_name; stream->path_name = create_output_path(new_path_view.data); if (!stream->path_name) { ERR("Failed to create a new output path"); @@ -2628,6 +2652,7 @@ static int relay_mkdir(const struct lttcomm_relayd_hdr *recv_hdr, goto end; } + DBG("MKDIR command has path \"%s\", changed to \"%s\"", path_view.data, path); ret = utils_mkdir_recursive(path, S_IRWXU | S_IRWXG, -1, -1); if (ret < 0) { ERR("relay creating output directory"); @@ -2751,6 +2776,8 @@ static int relay_rotate_rename(const struct lttcomm_relayd_hdr *recv_hdr, goto end; } + DBG("ROTATE_RENAME command has argument old path = \"%s\", new_path = \"%s\"", + old_path_view.data, new_path_view.data); complete_old_path = create_output_path(old_path_view.data); if (!complete_old_path) { ERR("Failed to build old output path in rotate_rename command"); @@ -2764,6 +2791,8 @@ static int relay_rotate_rename(const struct lttcomm_relayd_hdr *recv_hdr, ret = -1; goto end; } + DBG("Expanded ROTATE_RENAME arguments to old path = \"%s\", new_path = \"%s\"", + complete_old_path, complete_new_path); ret = utils_mkdir_recursive(complete_new_path, S_IRWXU | S_IRWXG, -1, -1); @@ -3223,7 +3252,34 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, } if (rotate_index || !stream->index_file) { - ret = create_rotate_index_file(stream); + const char *stream_path; + + /* + * The data connection creates the stream's first index file. + * + * This can happen _after_ a ROTATE_STREAM command. In + * other words, the data of the first packet of this stream + * can be received after a ROTATE_STREAM command. + * + * The ROTATE_STREAM command changes the stream's path_name + * to point to the "next" chunk. If a rotation is pending for + * this stream, as indicated by "rotate_at_seq_num != -1ULL", + * it means that we are still receiving data that belongs in the + * stream's former path. + * + * In this very specific case, we must ensure that the index + * file is created in the streams's former path, + * "prev_path_name". + * + * All other rotations beyond the first one are not affected + * by this problem since the actual rotation operation creates + * the new chunk's index file. + */ + stream_path = stream->rotate_at_seq_num == -1ULL ? + stream->path_name: + stream->prev_path_name; + + ret = create_rotate_index_file(stream, stream_path); if (ret < 0) { ERR("Failed to rotate index"); /* Put self-ref for this index due to error. */ @@ -3505,7 +3561,7 @@ static enum relay_connection_status relay_process_data_receive_payload( stream->tracefile_size_current += state->header.data_size + state->header.padding_size; - if (stream->prev_seq == -1ULL) { + if (stream->prev_data_seq == -1ULL) { new_stream = true; } if (index_flushed) { @@ -3514,7 +3570,7 @@ static enum relay_connection_status relay_process_data_receive_payload( stream->prev_index_seq = state->header.net_seq_num; } - stream->prev_seq = state->header.net_seq_num; + stream->prev_data_seq = state->header.net_seq_num; /* * Resetting the protocol state (to RECEIVE_HEADER) will trash the