* Return 0 on success, -1 on error.
*/
static
-int create_rotate_index_file(struct relay_stream *stream)
+int create_rotate_index_file(struct relay_stream *stream,
+ const char *stream_path)
{
int ret;
uint32_t major, minor;
}
major = stream->trace->session->major;
minor = stream->trace->session->minor;
- stream->index_file = lttng_index_file_create(stream->path_name,
+ stream->index_file = lttng_index_file_create(stream_path,
stream->channel_name,
-1, -1, stream->tracefile_size,
tracefile_array_get_file_index_head(stream->tfa),
/* Rotate also the index if the stream is not a metadata stream. */
if (!stream->is_metadata) {
- ret = create_rotate_index_file(stream);
+ ret = create_rotate_index_file(stream, stream->path_name);
if (ret < 0) {
ERR("Failed to rotate index file");
goto end;
goto end;
}
- ret = create_rotate_index_file(stream);
+ ret = create_rotate_index_file(stream, stream->path_name);
if (ret < 0) {
ERR("Rotate stream index file");
goto end;
goto end;
}
- trace_seq = min(stream->prev_seq, stream->prev_index_seq);
- if (stream->prev_seq == -1ULL || stream->prev_index_seq == -1ULL ||
+ trace_seq = min(stream->prev_data_seq, stream->prev_index_seq);
+ if (stream->prev_data_seq == -1ULL || stream->prev_index_seq == -1ULL ||
trace_seq < stream->rotate_at_seq_num) {
- DBG("Stream %" PRIu64 " not yet ready for rotation (rotate_at_seq_num = %" PRIu64 ", prev_seq = %" PRIu64 ", prev_index_seq = %" PRIu64 ")",
+ DBG("Stream %" PRIu64 " not yet ready for rotation (rotate_at_seq_num = %" PRIu64 ", prev_data_seq = %" PRIu64 ", prev_index_seq = %" PRIu64 ")",
stream->stream_handle,
stream->rotate_at_seq_num,
- stream->prev_seq,
+ stream->prev_data_seq,
stream->prev_index_seq);
goto end;
- } else if (stream->prev_seq > stream->rotate_at_seq_num) {
+ } else if (stream->prev_data_seq > stream->rotate_at_seq_num) {
/*
- * prev_seq is checked here since indexes and rotation
+ * prev_data_seq is checked here since indexes and rotation
* commands are serialized with respect to each other.
*/
DBG("Rotation after too much data has been written in tracefile "
* It could mean that we received a rotation position
* that is in the past.
*/
- ERR("Stream %" PRIu64 " is in an inconsistent state (rotate_at_seq_num = %" PRIu64 ", prev_seq = %" PRIu64 ", prev_index_seq = %" PRIu64 ")",
+ ERR("Stream %" PRIu64 " is in an inconsistent state (rotate_at_seq_num = %" PRIu64 ", prev_data_seq = %" PRIu64 ", prev_index_seq = %" PRIu64 ")",
stream->stream_handle,
stream->rotate_at_seq_num,
- stream->prev_seq,
+ stream->prev_data_seq,
stream->prev_index_seq);
ret = -1;
goto end;
* Ensure that both the index and stream data have been
* flushed up to the requested point.
*/
- stream_seq = min(stream->prev_seq, stream->prev_index_seq);
+ stream_seq = min(stream->prev_data_seq, stream->prev_index_seq);
} else {
- stream_seq = stream->prev_seq;
+ stream_seq = stream->prev_data_seq;
}
- DBG("Data pending for stream id %" PRIu64 ": prev_seq %" PRIu64
+ DBG("Data pending for stream id %" PRIu64 ": prev_data_seq %" PRIu64
", prev_index_seq %" PRIu64
", and last_seq %" PRIu64, msg.stream_id,
- stream->prev_seq, stream->prev_index_seq,
+ stream->prev_data_seq, stream->prev_index_seq,
msg.last_net_seq_num);
/* Avoid wrapping issue */
* Ensure that both the index and stream data have been
* flushed up to the requested point.
*/
- stream_seq = min(stream->prev_seq, stream->prev_index_seq);
+ stream_seq = min(stream->prev_data_seq, stream->prev_index_seq);
} else {
- stream_seq = stream->prev_seq;
+ stream_seq = stream->prev_data_seq;
}
if (!stream->closed || !(((int64_t) (stream_seq - stream->last_net_seq_num)) >= 0)) {
is_data_inflight = 1;
* Update the trace path (just the folder, the stream name does not
* change).
*/
- free(stream->path_name);
+ free(stream->prev_path_name);
+ stream->prev_path_name = stream->path_name;
stream->path_name = create_output_path(new_path_view.data);
if (!stream->path_name) {
ERR("Failed to create a new output path");
goto end;
}
+ DBG("MKDIR command has path \"%s\", changed to \"%s\"", path_view.data, path);
ret = utils_mkdir_recursive(path, S_IRWXU | S_IRWXG, -1, -1);
if (ret < 0) {
ERR("relay creating output directory");
goto end;
}
+ DBG("ROTATE_RENAME command has argument old path = \"%s\", new_path = \"%s\"",
+ old_path_view.data, new_path_view.data);
complete_old_path = create_output_path(old_path_view.data);
if (!complete_old_path) {
ERR("Failed to build old output path in rotate_rename command");
ret = -1;
goto end;
}
+ DBG("Expanded ROTATE_RENAME arguments to old path = \"%s\", new_path = \"%s\"",
+ complete_old_path, complete_new_path);
ret = utils_mkdir_recursive(complete_new_path, S_IRWXU | S_IRWXG,
-1, -1);
}
if (rotate_index || !stream->index_file) {
- ret = create_rotate_index_file(stream);
+ const char *stream_path;
+
+ /*
+ * The data connection creates the stream's first index file.
+ *
+ * This can happen _after_ a ROTATE_STREAM command. In
+ * other words, the data of the first packet of this stream
+ * can be received after a ROTATE_STREAM command.
+ *
+ * The ROTATE_STREAM command changes the stream's path_name
+ * to point to the "next" chunk. If a rotation is pending for
+ * this stream, as indicated by "rotate_at_seq_num != -1ULL",
+ * it means that we are still receiving data that belongs in the
+ * stream's former path.
+ *
+ * In this very specific case, we must ensure that the index
+ * file is created in the streams's former path,
+ * "prev_path_name".
+ *
+ * All other rotations beyond the first one are not affected
+ * by this problem since the actual rotation operation creates
+ * the new chunk's index file.
+ */
+ stream_path = stream->rotate_at_seq_num == -1ULL ?
+ stream->path_name:
+ stream->prev_path_name;
+
+ ret = create_rotate_index_file(stream, stream_path);
if (ret < 0) {
ERR("Failed to rotate index");
/* Put self-ref for this index due to error. */
stream->tracefile_size_current += state->header.data_size +
state->header.padding_size;
- if (stream->prev_seq == -1ULL) {
+ if (stream->prev_data_seq == -1ULL) {
new_stream = true;
}
if (index_flushed) {
stream->prev_index_seq = state->header.net_seq_num;
}
- stream->prev_seq = state->header.net_seq_num;
+ stream->prev_data_seq = state->header.net_seq_num;
/*
* Resetting the protocol state (to RECEIVE_HEADER) will trash the