relayd: implement file and session rotation on top of trace chunks
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Thu, 1 Aug 2019 18:42:32 +0000 (14:42 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Fri, 9 Aug 2019 15:28:43 +0000 (11:28 -0400)
Implement the file and session rotation functionality on top of the
trace chunk API. This ensures that a relay_stream and lttng_index_file
are always explicitly associated to a trace chunk and hold a reference
to it as long as their underlying files are contained within a given
trace chunk.

A number of relay_stream specific functions are moved to stream.c as
"methods" of the relay_stream interface in order to make use of
internal relay_stream helpers.

As part of this clean-up/move of the relay_stream code, raw payload
buffer handling has been replaced to use the lttng_buffer_view
interface which provides implicit bounds checking of the payload
buffers.

The stream rotation has been modified to reference a "new chunk id"
which is the ID of the trace chunk to which streams should rotate
"into". The command has also been modified to apply on a set of
streams. This is done in order to limit the number of commands on the
control socket. Conversely, all path names have been removed from the
command's payload.

The index file implementation now acquires a reference to the trace
chunk from which it is created. This affects the consumer daemon as
this code is shared with the relay daemon. This ensures that a chunk
is not released (and its close command executed, if any) before all
file descriptors related to it have been closed. Respecting this
guarantee is very important as the upcoming fd-cache will remove
the guarantee that an "fd" to a given file is always held open.
Moreover, close commands can rename a trace chunk's folders which
would cause files to be created in the wrong folder if they are
not properly created through the trace chunk.

Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
23 files changed:
include/lttng/lttng-error.h
src/bin/lttng-relayd/ctf-trace.h
src/bin/lttng-relayd/index.c
src/bin/lttng-relayd/index.h
src/bin/lttng-relayd/main.c
src/bin/lttng-relayd/session.c
src/bin/lttng-relayd/sessiond-trace-chunks.c
src/bin/lttng-relayd/stream.c
src/bin/lttng-relayd/stream.h
src/common/compat/directory-handle.c
src/common/consumer/consumer-stream.h
src/common/consumer/consumer.c
src/common/consumer/consumer.h
src/common/error.c
src/common/index/index.c
src/common/index/index.h
src/common/relayd/relayd.c
src/common/relayd/relayd.h
src/common/sessiond-comm/relayd.h
src/common/sessiond-comm/sessiond-comm.h
src/common/trace-chunk.c
src/common/utils.c
src/common/utils.h

index d6e72df98dcb2d11c5eb950e7fc9d5d906129f2d..e1c6ab0a8de61d4836a8b95e1fd82a3d85b9debc 100644 (file)
@@ -173,6 +173,7 @@ enum lttng_error_code {
        LTTNG_ERR_CLOSE_TRACE_CHUNK_FAIL_CONSUMER      = 150, /* trace chunk close failure on consumer */
        LTTNG_ERR_TRACE_CHUNK_EXISTS_FAIL_CONSUMER     = 151, /* failed to query consumer for trace chunk existence */
        LTTNG_ERR_INVALID_PROTOCOL                     = 152, /* a protocol error occurred */
        LTTNG_ERR_CLOSE_TRACE_CHUNK_FAIL_CONSUMER      = 150, /* trace chunk close failure on consumer */
        LTTNG_ERR_TRACE_CHUNK_EXISTS_FAIL_CONSUMER     = 151, /* failed to query consumer for trace chunk existence */
        LTTNG_ERR_INVALID_PROTOCOL                     = 152, /* a protocol error occurred */
+       LTTNG_ERR_FILE_CREATION_ERROR                  = 153, /* failed to create a file */
 
        /* MUST be last element */
        LTTNG_ERR_NR,                           /* Last element */
 
        /* MUST be last element */
        LTTNG_ERR_NR,                           /* Last element */
index 8efd1e430cf6b18841e600552221ed9b69f90b5c..120549633c082fa9a6b00313787322f826be1863 100644 (file)
@@ -35,7 +35,6 @@ struct ctf_trace {
 
        /* Trace sub-folder relative to the session output path. */
        char *path;
 
        /* Trace sub-folder relative to the session output path. */
        char *path;
-       bool index_folder_created;
 
        /*
         * The ctf_trace lock nests inside the session lock.
 
        /*
         * The ctf_trace lock nests inside the session lock.
index 92d4581d124a95921694129cba29213c08a7bd90..3cae94e8e57525d37cdddcd3526c010c6105d5cd 100644 (file)
@@ -27,6 +27,7 @@
 #include "lttng-relayd.h"
 #include "stream.h"
 #include "index.h"
 #include "lttng-relayd.h"
 #include "stream.h"
 #include "index.h"
+#include "connection.h"
 
 /*
  * Allocate a new relay index object. Pass the stream in which it is
 
 /*
  * Allocate a new relay index object. Pass the stream in which it is
@@ -411,3 +412,28 @@ end:
        rcu_read_unlock();
        return ret;
 }
        rcu_read_unlock();
        return ret;
 }
+
+/*
+ * Set index data from the control port to a given index object.
+ */
+int relay_index_set_control_data(struct relay_index *index,
+               const struct lttcomm_relayd_index *data,
+               unsigned int minor_version)
+{
+       /* The index on disk is encoded in big endian. */
+       const struct ctf_packet_index index_data = {
+               .packet_size = htobe64(data->packet_size),
+               .content_size = htobe64(data->content_size),
+               .timestamp_begin = htobe64(data->timestamp_begin),
+               .timestamp_end = htobe64(data->timestamp_end),
+               .events_discarded = htobe64(data->events_discarded),
+               .stream_id = htobe64(data->stream_id),
+       };
+
+       if (minor_version >= 8) {
+               index->index_data.stream_instance_id = htobe64(data->stream_instance_id);
+               index->index_data.packet_seq_num = htobe64(data->packet_seq_num);
+       }
+
+       return relay_index_set_data(index, &index_data);
+}
index 08388c001872a221cbdd6f42960513619ca42759..8466e91a468b501cc7bc14ac2a2aaeac0f1e87a2 100644 (file)
@@ -29,6 +29,8 @@
 #include "stream-fd.h"
 
 struct relay_stream;
 #include "stream-fd.h"
 
 struct relay_stream;
+struct relay_connection;
+struct lttcomm_relayd_index;
 
 struct relay_index {
        /*
 
 struct relay_index {
        /*
@@ -76,5 +78,8 @@ void relay_index_close_all(struct relay_stream *stream);
 void relay_index_close_partial_fd(struct relay_stream *stream);
 uint64_t relay_index_find_last(struct relay_stream *stream);
 int relay_index_switch_all_files(struct relay_stream *stream);
 void relay_index_close_partial_fd(struct relay_stream *stream);
 uint64_t relay_index_find_last(struct relay_stream *stream);
 int relay_index_switch_all_files(struct relay_stream *stream);
+int relay_index_set_control_data(struct relay_index *index,
+               const struct lttcomm_relayd_index *data,
+               unsigned int minor_version);
 
 #endif /* _RELAY_INDEX_H */
 
 #endif /* _RELAY_INDEX_H */
index dddc2a2b4f77b25c9ba968826460147302f25e0a..82e2603544c9b770ec0f60a1344f456835ddb76e 100644 (file)
@@ -105,7 +105,6 @@ static int lttng_relay_ready = NR_LTTNG_RELAY_READY;
 
 /* Size of receive buffer. */
 #define RECV_DATA_BUFFER_SIZE          65536
 
 /* Size of receive buffer. */
 #define RECV_DATA_BUFFER_SIZE          65536
-#define FILE_COPY_BUFFER_SIZE          65536
 
 static int recv_child_signal;  /* Set to 1 when a SIGUSR1 signal is received. */
 static pid_t child_ppid;       /* Internal parent PID use with daemonize. */
 
 static int recv_child_signal;  /* Set to 1 when a SIGUSR1 signal is received. */
 static pid_t child_ppid;       /* Internal parent PID use with daemonize. */
@@ -1051,33 +1050,6 @@ error_testpoint:
        return NULL;
 }
 
        return NULL;
 }
 
-/*
- * Set index data from the control port to a given index object.
- */
-static int set_index_control_data(struct relay_index *index,
-               struct lttcomm_relayd_index *data,
-               struct relay_connection *conn)
-{
-       struct ctf_packet_index index_data;
-
-       /*
-        * The index on disk is encoded in big endian.
-        */
-       index_data.packet_size = htobe64(data->packet_size);
-       index_data.content_size = htobe64(data->content_size);
-       index_data.timestamp_begin = htobe64(data->timestamp_begin);
-       index_data.timestamp_end = htobe64(data->timestamp_end);
-       index_data.events_discarded = htobe64(data->events_discarded);
-       index_data.stream_id = htobe64(data->stream_id);
-
-       if (conn->minor >= 8) {
-               index->index_data.stream_instance_id = htobe64(data->stream_instance_id);
-               index->index_data.packet_seq_num = htobe64(data->packet_seq_num);
-       }
-
-       return relay_index_set_data(index, &index_data);
-}
-
 static bool session_streams_have_index(const struct relay_session *session)
 {
        return session->minor >= 4 && !session->snapshot;
 static bool session_streams_have_index(const struct relay_session *session)
 {
        return session->minor >= 4 && !session->snapshot;
@@ -1475,15 +1447,14 @@ int relay_reset_metadata(const struct lttcomm_relayd_hdr *recv_hdr,
                goto end_unlock;
        }
 
                goto end_unlock;
        }
 
-       ret = utils_rotate_stream_file(stream->path_name, stream->channel_name,
-                       0, 0, -1, -1, stream->stream_fd->fd, NULL,
-                       &stream->stream_fd->fd);
+       ret = stream_reset_file(stream);
        if (ret < 0) {
        if (ret < 0) {
-               ERR("Failed to rotate metadata file %s of channel %s",
-                               stream->path_name, stream->channel_name);
+               ERR("Failed to reset metadata stream %" PRIu64
+                               ": stream_path = %s, channel = %s",
+                               stream->stream_handle, stream->path_name,
+                               stream->channel_name);
                goto end_unlock;
        }
                goto end_unlock;
        }
-
 end_unlock:
        pthread_mutex_unlock(&stream->lock);
        stream_put(stream);
 end_unlock:
        pthread_mutex_unlock(&stream->lock);
        stream_put(stream);
@@ -1554,371 +1525,6 @@ static int relay_start(const struct lttcomm_relayd_hdr *recv_hdr,
        return ret;
 }
 
        return ret;
 }
 
-/*
- * Append padding to the file pointed by the file descriptor fd.
- */
-static int write_padding_to_file(int fd, uint32_t size)
-{
-       ssize_t ret = 0;
-       char *zeros;
-
-       if (size == 0) {
-               goto end;
-       }
-
-       zeros = zmalloc(size);
-       if (zeros == NULL) {
-               PERROR("zmalloc zeros for padding");
-               ret = -1;
-               goto end;
-       }
-
-       ret = lttng_write(fd, zeros, size);
-       if (ret < size) {
-               PERROR("write padding to file");
-       }
-
-       free(zeros);
-
-end:
-       return ret;
-}
-
-/*
- * Close the current index file if it is open, and create a new one.
- *
- * Return 0 on success, -1 on error.
- */
-static
-int create_rotate_index_file(struct relay_stream *stream,
-               const char *channel_path)
-{
-       int ret;
-       uint32_t major, minor;
-
-       ASSERT_LOCKED(stream->lock);
-
-       /* Put ref on previous index_file. */
-       if (stream->index_file) {
-               lttng_index_file_put(stream->index_file);
-               stream->index_file = NULL;
-       }
-       major = stream->trace->session->major;
-       minor = stream->trace->session->minor;
-       if (!stream->trace->index_folder_created) {
-               char *index_subpath = NULL;
-
-               ret = asprintf(&index_subpath, "%s/%s", channel_path, DEFAULT_INDEX_DIR);
-               if (ret < 0) {
-                       goto end;
-               }
-
-               ret = lttng_trace_chunk_create_subdirectory(stream->trace_chunk, index_subpath);
-               free(index_subpath);
-               if (ret) {
-                       goto end;
-               }
-               stream->trace->index_folder_created = true;
-       }
-       stream->index_file = lttng_index_file_create_from_trace_chunk(
-                       stream->trace_chunk, channel_path, stream->channel_name,
-                       stream->tracefile_size, stream->tracefile_count,
-                       lttng_to_index_major(major, minor),
-                       lttng_to_index_minor(major, minor), true);
-       if (!stream->index_file) {
-               ret = -1;
-               goto end;
-       }
-
-       ret = 0;
-
-end:
-       return ret;
-}
-
-static
-int do_rotate_stream_data(struct relay_stream *stream)
-{
-       int ret;
-
-       DBG("Rotating stream %" PRIu64 " data file",
-                       stream->stream_handle);
-       /* Perform the stream rotation. */
-       ret = utils_rotate_stream_file(stream->path_name,
-                       stream->channel_name, stream->tracefile_size,
-                       stream->tracefile_count, -1,
-                       -1, stream->stream_fd->fd,
-                       NULL, &stream->stream_fd->fd);
-       if (ret < 0) {
-               ERR("Rotating stream output file");
-               goto end;
-       }
-       stream->tracefile_size_current = 0;
-       stream->pos_after_last_complete_data_index = 0;
-       stream->data_rotated = true;
-
-       if (stream->data_rotated && stream->index_rotated) {
-               /* Rotation completed; reset its state. */
-               DBG("Rotation completed for stream %" PRIu64,
-                               stream->stream_handle);
-               stream->rotate_at_seq_num = -1ULL;
-               stream->data_rotated = false;
-               stream->index_rotated = false;
-       }
-end:
-       return ret;
-}
-
-/*
- * If too much data has been written in a tracefile before we received the
- * rotation command, we have to move the excess data to the new tracefile and
- * perform the rotation. This can happen because the control and data
- * connections are separate, the indexes as well as the commands arrive from
- * the control connection and we have no control over the order so we could be
- * in a situation where too much data has been received on the data connection
- * before the rotation command on the control connection arrives.
- */
-static
-int rotate_truncate_stream(struct relay_stream *stream)
-{
-       int ret, new_fd;
-       off_t lseek_ret;
-       uint64_t diff, pos = 0;
-       char buf[FILE_COPY_BUFFER_SIZE];
-
-       assert(!stream->is_metadata);
-
-       assert(stream->tracefile_size_current >
-                       stream->pos_after_last_complete_data_index);
-       diff = stream->tracefile_size_current -
-                       stream->pos_after_last_complete_data_index;
-
-       /* Create the new tracefile. */
-       new_fd = utils_create_stream_file(stream->path_name,
-                       stream->channel_name,
-                       stream->tracefile_size, stream->tracefile_count,
-                       /* uid */ -1, /* gid */ -1, /* suffix */ NULL);
-       if (new_fd < 0) {
-               ERR("Failed to create new stream file at path %s for channel %s",
-                               stream->path_name, stream->channel_name);
-               ret = -1;
-               goto end;
-       }
-
-       /*
-        * Rewind the current tracefile to the position at which the rotation
-        * should have occurred.
-        */
-       lseek_ret = lseek(stream->stream_fd->fd,
-                       stream->pos_after_last_complete_data_index, SEEK_SET);
-       if (lseek_ret < 0) {
-               PERROR("seek truncate stream");
-               ret = -1;
-               goto end;
-       }
-
-       /* Move data from the old file to the new file. */
-       while (pos < diff) {
-               uint64_t count, bytes_left;
-               ssize_t io_ret;
-
-               bytes_left = diff - pos;
-               count = bytes_left > sizeof(buf) ? sizeof(buf) : bytes_left;
-               assert(count <= SIZE_MAX);
-
-               io_ret = lttng_read(stream->stream_fd->fd, buf, count);
-               if (io_ret < (ssize_t) count) {
-                       char error_string[256];
-
-                       snprintf(error_string, sizeof(error_string),
-                                       "Failed to read %" PRIu64 " bytes from fd %i in rotate_truncate_stream(), returned %zi",
-                                       count, stream->stream_fd->fd, io_ret);
-                       if (io_ret == -1) {
-                               PERROR("%s", error_string);
-                       } else {
-                               ERR("%s", error_string);
-                       }
-                       ret = -1;
-                       goto end;
-               }
-
-               io_ret = lttng_write(new_fd, buf, count);
-               if (io_ret < (ssize_t) count) {
-                       char error_string[256];
-
-                       snprintf(error_string, sizeof(error_string),
-                                       "Failed to write %" PRIu64 " bytes from fd %i in rotate_truncate_stream(), returned %zi",
-                                       count, new_fd, io_ret);
-                       if (io_ret == -1) {
-                               PERROR("%s", error_string);
-                       } else {
-                               ERR("%s", error_string);
-                       }
-                       ret = -1;
-                       goto end;
-               }
-
-               pos += count;
-       }
-
-       /* Truncate the file to get rid of the excess data. */
-       ret = ftruncate(stream->stream_fd->fd,
-                       stream->pos_after_last_complete_data_index);
-       if (ret) {
-               PERROR("ftruncate");
-               goto end;
-       }
-
-       ret = close(stream->stream_fd->fd);
-       if (ret < 0) {
-               PERROR("Closing tracefile");
-               goto end;
-       }
-
-       /*
-        * Update the offset and FD of all the eventual indexes created by the
-        * data connection before the rotation command arrived.
-        */
-       ret = relay_index_switch_all_files(stream);
-       if (ret < 0) {
-               ERR("Failed to rotate index file");
-               goto end;
-       }
-
-       stream->stream_fd->fd = new_fd;
-       stream->tracefile_size_current = diff;
-       stream->pos_after_last_complete_data_index = 0;
-       stream->rotate_at_seq_num = -1ULL;
-
-       ret = 0;
-
-end:
-       return ret;
-}
-
-/*
- * Check if a stream's index file should be rotated (for session rotation).
- * Must be called with the stream lock held.
- *
- * Return 0 on success, a negative value on error.
- */
-static
-int try_rotate_stream_index(struct relay_stream *stream)
-{
-       int ret = 0;
-
-       if (stream->rotate_at_seq_num == -1ULL) {
-               /* No rotation expected. */
-               goto end;
-       }
-
-       if (stream->index_rotated) {
-               /* Rotation of the index has already occurred. */
-               goto end;
-       }
-
-       if (stream->prev_index_seq == -1ULL ||
-                       stream->prev_index_seq < stream->rotate_at_seq_num) {
-               DBG("Stream %" PRIu64 " index not yet ready for rotation (rotate_at_seq_num = %" PRIu64 ", prev_index_seq = %" PRIu64 ")",
-                               stream->stream_handle,
-                               stream->rotate_at_seq_num,
-                               stream->prev_index_seq);
-               goto end;
-       } else if (stream->prev_index_seq != stream->rotate_at_seq_num) {
-               /*
-                * Unexpected, protocol error/bug.
-                * It could mean that we received a rotation position
-                * that is in the past.
-                */
-               ERR("Stream %" PRIu64 " index is in an inconsistent state (rotate_at_seq_num = %" PRIu64 ", prev_data_seq = %" PRIu64 ", prev_index_seq = %" PRIu64 ")",
-                               stream->stream_handle,
-                               stream->rotate_at_seq_num,
-                               stream->prev_data_seq,
-                               stream->prev_index_seq);
-               ret = -1;
-               goto end;
-       } else {
-               DBG("Rotating stream %" PRIu64 " index file",
-                               stream->stream_handle);
-               ret = create_rotate_index_file(stream, stream->path_name);
-               stream->index_rotated = true;
-
-               if (stream->data_rotated && stream->index_rotated) {
-                       /* Rotation completed; reset its state. */
-                       DBG("Rotation completed for stream %" PRIu64,
-                                       stream->stream_handle);
-                       stream->rotate_at_seq_num = -1ULL;
-                       stream->data_rotated = false;
-                       stream->index_rotated = false;
-               }
-       }
-
-end:
-       return ret;
-}
-
-/*
- * Check if a stream's data file (as opposed to index) should be rotated
- * (for session rotation).
- * Must be called with the stream lock held.
- *
- * Return 0 on success, a negative value on error.
- */
-static
-int try_rotate_stream_data(struct relay_stream *stream)
-{
-       int ret = 0;
-
-       if (stream->rotate_at_seq_num == -1ULL) {
-               /* No rotation expected. */
-               goto end;
-       }
-
-       if (stream->data_rotated) {
-               /* Rotation of the data file has already occurred. */
-               goto end;
-       }
-
-       if (stream->prev_data_seq == -1ULL ||
-                       stream->prev_data_seq < stream->rotate_at_seq_num) {
-               DBG("Stream %" PRIu64 " not yet ready for rotation (rotate_at_seq_num = %" PRIu64 ", prev_data_seq = %" PRIu64 ")",
-                               stream->stream_handle,
-                               stream->rotate_at_seq_num,
-                               stream->prev_data_seq);
-               goto end;
-       } else if (stream->prev_data_seq > stream->rotate_at_seq_num) {
-               /*
-                * prev_data_seq is checked here since indexes and rotation
-                * commands are serialized with respect to each other.
-                */
-               DBG("Rotation after too much data has been written in tracefile "
-                               "for stream %" PRIu64 ", need to truncate before "
-                               "rotating", stream->stream_handle);
-               ret = rotate_truncate_stream(stream);
-               if (ret) {
-                       ERR("Failed to truncate stream");
-                       goto end;
-               }
-       } else if (stream->prev_data_seq != stream->rotate_at_seq_num) {
-               /*
-                * Unexpected, protocol error/bug.
-                * It could mean that we received a rotation position
-                * that is in the past.
-                */
-               ERR("Stream %" PRIu64 " data is in an inconsistent state (rotate_at_seq_num = %" PRIu64 ", prev_data_seq = %" PRIu64 ")",
-                               stream->stream_handle,
-                               stream->rotate_at_seq_num,
-                               stream->prev_data_seq);
-               ret = -1;
-               goto end;
-       } else {
-               ret = do_rotate_stream_data(stream);
-       }
-
-end:
-       return ret;
-}
-
 /*
  * relay_recv_metadata: receive the metadata for the session.
  */
 /*
  * relay_recv_metadata: receive the metadata for the session.
  */
@@ -1927,11 +1533,11 @@ static int relay_recv_metadata(const struct lttcomm_relayd_hdr *recv_hdr,
                const struct lttng_buffer_view *payload)
 {
        int ret = 0;
                const struct lttng_buffer_view *payload)
 {
        int ret = 0;
-       ssize_t size_ret;
        struct relay_session *session = conn->session;
        struct lttcomm_relayd_metadata_payload metadata_payload_header;
        struct relay_stream *metadata_stream;
        uint64_t metadata_payload_size;
        struct relay_session *session = conn->session;
        struct lttcomm_relayd_metadata_payload metadata_payload_header;
        struct relay_stream *metadata_stream;
        uint64_t metadata_payload_size;
+       struct lttng_buffer_view packet_view;
 
        if (!session) {
                ERR("Metadata sent before version check");
 
        if (!session) {
                ERR("Metadata sent before version check");
@@ -1960,36 +1566,23 @@ static int relay_recv_metadata(const struct lttcomm_relayd_hdr *recv_hdr,
                goto end;
        }
 
                goto end;
        }
 
-       pthread_mutex_lock(&metadata_stream->lock);
-
-       size_ret = lttng_write(metadata_stream->stream_fd->fd,
-                       payload->data + sizeof(metadata_payload_header),
-                       metadata_payload_size);
-       if (size_ret < metadata_payload_size) {
-               ERR("Relay error writing metadata on file");
+       packet_view = lttng_buffer_view_from_view(payload,
+                       sizeof(metadata_payload_header), metadata_payload_size);
+       if (!packet_view.data) {
+               ERR("Invalid metadata packet length announced by header");
                ret = -1;
                goto end_put;
        }
 
                ret = -1;
                goto end_put;
        }
 
-       size_ret = write_padding_to_file(metadata_stream->stream_fd->fd,
+       pthread_mutex_lock(&metadata_stream->lock);
+       ret = stream_write(metadata_stream, &packet_view,
                        metadata_payload_header.padding_size);
                        metadata_payload_header.padding_size);
-       if (size_ret < (int64_t) metadata_payload_header.padding_size) {
+       pthread_mutex_unlock(&metadata_stream->lock);
+       if (ret){
                ret = -1;
                goto end_put;
        }
                ret = -1;
                goto end_put;
        }
-
-       metadata_stream->metadata_received +=
-               metadata_payload_size + metadata_payload_header.padding_size;
-       DBG2("Relay metadata written. Updated metadata_received %" PRIu64,
-               metadata_stream->metadata_received);
-
-       ret = try_rotate_stream_data(metadata_stream);
-       if (ret < 0) {
-               goto end_put;
-       }
-
 end_put:
 end_put:
-       pthread_mutex_unlock(&metadata_stream->lock);
        stream_put(metadata_stream);
 end:
        return ret;
        stream_put(metadata_stream);
 end:
        return ret;
@@ -2397,7 +1990,6 @@ static int relay_recv_index(const struct lttcomm_relayd_hdr *recv_hdr,
        ssize_t send_ret;
        struct relay_session *session = conn->session;
        struct lttcomm_relayd_index index_info;
        ssize_t send_ret;
        struct relay_session *session = conn->session;
        struct lttcomm_relayd_index index_info;
-       struct relay_index *index;
        struct lttcomm_relayd_generic_reply reply;
        struct relay_stream *stream;
        size_t msg_len;
        struct lttcomm_relayd_generic_reply reply;
        struct relay_stream *stream;
        size_t msg_len;
@@ -2443,73 +2035,17 @@ static int relay_recv_index(const struct lttcomm_relayd_hdr *recv_hdr,
                ret = -1;
                goto end;
        }
                ret = -1;
                goto end;
        }
-       pthread_mutex_lock(&stream->lock);
-
-       /* Live beacon handling */
-       if (index_info.packet_size == 0) {
-               DBG("Received live beacon for stream %" PRIu64,
-                               stream->stream_handle);
-
-               /*
-                * Only flag a stream inactive when it has already
-                * received data and no indexes are in flight.
-                */
-               if (stream->index_received_seqcount > 0
-                               && stream->indexes_in_flight == 0) {
-                       stream->beacon_ts_end = index_info.timestamp_end;
-               }
-               ret = 0;
-               goto end_stream_put;
-       } else {
-               stream->beacon_ts_end = -1ULL;
-       }
 
 
-       if (stream->ctf_stream_id == -1ULL) {
-               stream->ctf_stream_id = index_info.stream_id;
-       }
-       index = relay_index_get_by_id_or_create(stream, index_info.net_seq_num);
-       if (!index) {
-               ret = -1;
-               ERR("relay_index_get_by_id_or_create index NULL");
-               goto end_stream_put;
-       }
-       if (set_index_control_data(index, &index_info, conn)) {
-               ERR("set_index_control_data error");
-               relay_index_put(index);
-               ret = -1;
+       pthread_mutex_lock(&stream->lock);
+       ret = stream_add_index(stream, &index_info);
+       pthread_mutex_unlock(&stream->lock);
+       if (ret) {
                goto end_stream_put;
        }
                goto end_stream_put;
        }
-       ret = relay_index_try_flush(index);
-       if (ret == 0) {
-               tracefile_array_commit_seq(stream->tfa);
-               stream->index_received_seqcount++;
-               stream->pos_after_last_complete_data_index += index->total_size;
-               stream->prev_index_seq = index_info.net_seq_num;
-
-               ret = try_rotate_stream_index(stream);
-               if (ret < 0) {
-                       goto end_stream_put;
-               }
-       } else if (ret > 0) {
-               /* no flush. */
-               ret = 0;
-       } else {
-               /*
-                * ret < 0
-                *
-                * relay_index_try_flush is responsible for the self-reference
-                * put of the index object on error.
-                */
-               ERR("relay_index_try_flush error %d", ret);
-               ret = -1;
-       }
 
 end_stream_put:
 
 end_stream_put:
-       pthread_mutex_unlock(&stream->lock);
        stream_put(stream);
        stream_put(stream);
-
 end:
 end:
-
        memset(&reply, 0, sizeof(reply));
        if (ret < 0) {
                reply.ret_code = htobe32(LTTNG_ERR_UNK);
        memset(&reply, 0, sizeof(reply));
        if (ret < 0) {
                reply.ret_code = htobe32(LTTNG_ERR_UNK);
@@ -2572,22 +2108,25 @@ end_no_session:
 }
 
 /*
 }
 
 /*
- * relay_rotate_session_stream: rotate a stream to a new tracefile for the session
- * rotation feature (not the tracefile rotation feature).
+ * relay_rotate_session_stream: rotate a stream to a new tracefile for the
+ * session rotation feature (not the tracefile rotation feature).
  */
  */
-static int relay_rotate_session_stream(const struct lttcomm_relayd_hdr *recv_hdr,
+static int relay_rotate_session_streams(
+               const struct lttcomm_relayd_hdr *recv_hdr,
                struct relay_connection *conn,
                const struct lttng_buffer_view *payload)
 {
        int ret;
                struct relay_connection *conn,
                const struct lttng_buffer_view *payload)
 {
        int ret;
+       uint32_t i;
        ssize_t send_ret;
        ssize_t send_ret;
+       enum lttng_error_code reply_code = LTTNG_ERR_UNK;
        struct relay_session *session = conn->session;
        struct relay_session *session = conn->session;
-       struct lttcomm_relayd_rotate_stream stream_info;
-       struct lttcomm_relayd_generic_reply reply;
-       struct relay_stream *stream;
-       size_t header_len;
-       size_t path_len;
-       struct lttng_buffer_view new_path_view;
+       struct lttcomm_relayd_rotate_streams rotate_streams;
+       struct lttcomm_relayd_generic_reply reply = {};
+       struct relay_stream *stream = NULL;
+       const size_t header_len = sizeof(struct lttcomm_relayd_rotate_streams);
+       struct lttng_trace_chunk *next_trace_chunk = NULL;
+       struct lttng_buffer_view stream_positions;
 
        if (!session || !conn->version_check_done) {
                ERR("Trying to rotate a stream before version check");
 
        if (!session || !conn->version_check_done) {
                ERR("Trying to rotate a stream before version check");
@@ -2601,8 +2140,6 @@ static int relay_rotate_session_stream(const struct lttcomm_relayd_hdr *recv_hdr
                goto end_no_reply;
        }
 
                goto end_no_reply;
        }
 
-       header_len = sizeof(struct lttcomm_relayd_rotate_stream);
-
        if (payload->size < header_len) {
                ERR("Unexpected payload size in \"relay_rotate_session_stream\": expected >= %zu bytes, got %zu bytes",
                                header_len, payload->size);
        if (payload->size < header_len) {
                ERR("Unexpected payload size in \"relay_rotate_session_stream\": expected >= %zu bytes, got %zu bytes",
                                header_len, payload->size);
@@ -2610,96 +2147,88 @@ static int relay_rotate_session_stream(const struct lttcomm_relayd_hdr *recv_hdr
                goto end_no_reply;
        }
 
                goto end_no_reply;
        }
 
-       memcpy(&stream_info, payload->data, header_len);
+       memcpy(&rotate_streams, payload->data, header_len);
 
 
-       /* Convert to host */
-       stream_info.pathname_length = be32toh(stream_info.pathname_length);
-       stream_info.stream_id = be64toh(stream_info.stream_id);
-       stream_info.new_chunk_id = be64toh(stream_info.new_chunk_id);
-       stream_info.rotate_at_seq_num = be64toh(stream_info.rotate_at_seq_num);
+       /* Convert header to host endianness. */
+       rotate_streams = (typeof(rotate_streams)) {
+               .stream_count = be32toh(rotate_streams.stream_count),
+               .new_chunk_id = (typeof(rotate_streams.new_chunk_id)) {
+                       .is_set = !!rotate_streams.new_chunk_id.is_set,
+                       .value = be64toh(rotate_streams.new_chunk_id.value),
+               }
+       };
 
 
-       path_len = stream_info.pathname_length;
-       if (payload->size < header_len + path_len) {
-               ERR("Unexpected payload size in \"relay_rotate_session_stream\" including path: expected >= %zu bytes, got %zu bytes",
-                               header_len + path_len, payload->size);
-               ret = -1;
-               goto end_no_reply;
-       }       
-       
-       /* Ensure it fits in local filename length. */
-       if (path_len >= LTTNG_PATH_MAX) {
-               ret = -ENAMETOOLONG;
-               ERR("Length of relay_rotate_session_stream command's path name (%zu bytes) exceeds the maximal allowed length of %i bytes",
-                               path_len, LTTNG_PATH_MAX);
-               goto end;
+       if (rotate_streams.new_chunk_id.is_set) {
+               /*
+                * Retrieve the trace chunk the stream must transition to. As
+                * per the protocol, this chunk should have been created
+                * before this command is received.
+                */
+               next_trace_chunk = sessiond_trace_chunk_registry_get_chunk(
+                               sessiond_trace_chunk_registry,
+                               session->sessiond_uuid, session->id,
+                               rotate_streams.new_chunk_id.value);
+               if (!next_trace_chunk) {
+                       char uuid_str[UUID_STR_LEN];
+
+                       lttng_uuid_to_str(session->sessiond_uuid, uuid_str);
+                       ERR("Unknown next trace chunk in ROTATE_STREAMS command: sessiond_uuid = {%s}, session_id = %" PRIu64
+                                       ", trace_chunk_id = %" PRIu64,
+                                       uuid_str, session->id,
+                                       rotate_streams.new_chunk_id.value);
+                       reply_code = LTTNG_ERR_INVALID_PROTOCOL;
+                       ret = -1;
+                       goto end;
+               }
        }
 
        }
 
-       new_path_view = lttng_buffer_view_from_view(payload, header_len,
-                       stream_info.pathname_length);
-
-       stream = stream_get_by_id(stream_info.stream_id);
-       if (!stream) {
+       stream_positions = lttng_buffer_view_from_view(payload,
+                       sizeof(rotate_streams), -1);
+       if (!stream_positions.data ||
+                       stream_positions.size <
+                                       (rotate_streams.stream_count *
+                                                       sizeof(struct lttcomm_relayd_stream_rotation_position))) {
+               reply_code = LTTNG_ERR_INVALID_PROTOCOL;
                ret = -1;
                goto end;
        }
 
                ret = -1;
                goto end;
        }
 
-       pthread_mutex_lock(&stream->lock);
-
-       /*
-        * Update the trace path (just the folder, the stream name does not
-        * change).
-        */
-       free(stream->prev_path_name);
-       stream->prev_path_name = stream->path_name;
-       stream->path_name = create_output_path(new_path_view.data);
-       if (!stream->path_name) {
-               ERR("Failed to create a new output path");
-               ret = -1;
-               goto end_stream_unlock;
-       }
-       ret = utils_mkdir_recursive(stream->path_name, S_IRWXU | S_IRWXG,
-                       -1, -1);
-       if (ret < 0) {
-               ERR("relay creating output directory");
-               ret = -1;
-               goto end_stream_unlock;
-       }
+       for (i = 0; i < rotate_streams.stream_count; i++) {
+               struct lttcomm_relayd_stream_rotation_position *position_comm =
+                               &((typeof(position_comm)) stream_positions.data)[i];
+               const struct lttcomm_relayd_stream_rotation_position pos = {
+                       .stream_id = be64toh(position_comm->stream_id),
+                       .rotate_at_seq_num = be64toh(
+                                       position_comm->rotate_at_seq_num),
+               };
 
 
-       if (stream->is_metadata) {
-               /*
-                * Metadata streams have no index; consider its rotation
-                * complete.
-                */
-               stream->index_rotated = true;
-               /*
-                * The metadata stream is sent only over the control connection
-                * so we know we have all the data to perform the stream
-                * rotation.
-                */
-               ret = do_rotate_stream_data(stream);
-       } else {
-               stream->rotate_at_seq_num = stream_info.rotate_at_seq_num;
-               ret = try_rotate_stream_data(stream);
-               if (ret < 0) {
-                       goto end_stream_unlock;
+               stream = stream_get_by_id(pos.stream_id);
+               if (!stream) {
+                       reply_code = LTTNG_ERR_INVALID;
+                       ret = -1;
+                       goto end;
                }
 
                }
 
-               ret = try_rotate_stream_index(stream);
-               if (ret < 0) {
-                       goto end_stream_unlock;
+               pthread_mutex_lock(&stream->lock);
+               ret = stream_set_pending_rotation(stream, next_trace_chunk,
+                               pos.rotate_at_seq_num);
+               pthread_mutex_unlock(&stream->lock);
+               if (ret) {
+                       reply_code = LTTNG_ERR_FILE_CREATION_ERROR;
+                       goto end;
                }
                }
+
+               stream_put(stream);
+               stream = NULL;
        }
 
        }
 
-end_stream_unlock:
-       pthread_mutex_unlock(&stream->lock);
-       stream_put(stream);
+       reply_code = LTTNG_OK;
 end:
 end:
-       memset(&reply, 0, sizeof(reply));
-       if (ret < 0) {
-               reply.ret_code = htobe32(LTTNG_ERR_UNK);
-       } else {
-               reply.ret_code = htobe32(LTTNG_OK);
+       if (stream) {
+               stream_put(stream);
        }
        }
+
+       reply.ret_code = htobe32((uint32_t) reply_code);
        send_ret = conn->sock->ops->sendmsg(conn->sock, &reply,
                        sizeof(struct lttcomm_relayd_generic_reply), 0);
        if (send_ret < (ssize_t) sizeof(reply)) {
        send_ret = conn->sock->ops->sendmsg(conn->sock, &reply,
                        sizeof(struct lttcomm_relayd_generic_reply), 0);
        if (send_ret < (ssize_t) sizeof(reply)) {
@@ -2709,6 +2238,7 @@ end:
        }
 
 end_no_reply:
        }
 
 end_no_reply:
+       lttng_trace_chunk_put(next_trace_chunk);
        return ret;
 }
 
        return ret;
 }
 
@@ -2895,8 +2425,8 @@ static int relay_create_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr,
        pthread_mutex_lock(&conn->session->lock);
        lttng_trace_chunk_put(conn->session->current_trace_chunk);
        conn->session->current_trace_chunk = published_chunk;
        pthread_mutex_lock(&conn->session->lock);
        lttng_trace_chunk_put(conn->session->current_trace_chunk);
        conn->session->current_trace_chunk = published_chunk;
-       pthread_mutex_unlock(&conn->session->lock);
        published_chunk = NULL;
        published_chunk = NULL;
+       pthread_mutex_unlock(&conn->session->lock);
 
 end:
        reply.ret_code = htobe32((uint32_t) reply_code);
 
 end:
        reply.ret_code = htobe32((uint32_t) reply_code);
@@ -2933,7 +2463,7 @@ static int relay_close_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr,
        enum lttng_error_code reply_code = LTTNG_OK;
        enum lttng_trace_chunk_status chunk_status;
        uint64_t chunk_id;
        enum lttng_error_code reply_code = LTTNG_OK;
        enum lttng_trace_chunk_status chunk_status;
        uint64_t chunk_id;
-       LTTNG_OPTIONAL(enum lttng_trace_chunk_command_type) close_command;
+       LTTNG_OPTIONAL(enum lttng_trace_chunk_command_type) close_command = {};
        time_t close_timestamp;
 
        if (!session || !conn->version_check_done) {
        time_t close_timestamp;
 
        if (!session || !conn->version_check_done) {
@@ -3001,6 +2531,21 @@ static int relay_close_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr,
                }
        }
 
                }
        }
 
+       pthread_mutex_lock(&session->lock);
+       if (session->current_trace_chunk == chunk) {
+               /*
+                * After a trace chunk close command, no new streams
+                * referencing the chunk may be created. Hence, on the
+                * event that no new trace chunk have been created for
+                * the session, the reference to the current trace chunk
+                * is released in order to allow it to be reclaimed when
+                * the last stream releases its reference to it.
+                */
+               lttng_trace_chunk_put(session->current_trace_chunk);
+               session->current_trace_chunk = NULL;
+       }
+       pthread_mutex_unlock(&session->lock);
+
 end:
        reply.ret_code = htobe32((uint32_t) reply_code);
        send_ret = conn->sock->ops->sendmsg(conn->sock,
 end:
        reply.ret_code = htobe32((uint32_t) reply_code);
        send_ret = conn->sock->ops->sendmsg(conn->sock,
@@ -3017,6 +2562,67 @@ end_no_reply:
        return ret;
 }
 
        return ret;
 }
 
+/*
+ * relay_trace_chunk_exists: check if a trace chunk exists
+ */
+static int relay_trace_chunk_exists(const struct lttcomm_relayd_hdr *recv_hdr,
+               struct relay_connection *conn,
+               const struct lttng_buffer_view *payload)
+{
+       int ret = 0;
+       ssize_t send_ret;
+       struct relay_session *session = conn->session;
+       struct lttcomm_relayd_trace_chunk_exists *msg;
+       struct lttcomm_relayd_trace_chunk_exists_reply reply = {};
+       struct lttng_buffer_view header_view;
+       struct lttng_trace_chunk *chunk = NULL;
+       uint64_t chunk_id;
+
+       if (!session || !conn->version_check_done) {
+               ERR("Trying to close a trace chunk before version check");
+               ret = -1;
+               goto end_no_reply;
+       }
+
+       if (session->major == 2 && session->minor < 11) {
+               ERR("Chunk close command is unsupported before 2.11");
+               ret = -1;
+               goto end_no_reply;
+       }
+
+       header_view = lttng_buffer_view_from_view(payload, 0, sizeof(*msg));
+       if (!header_view.data) {
+               ERR("Failed to receive payload of chunk close command");
+               ret = -1;
+               goto end_no_reply;
+       }
+
+       /* Convert to host endianness. */
+       msg = (typeof(msg)) header_view.data;
+       chunk_id = be64toh(msg->chunk_id);
+
+       chunk = sessiond_trace_chunk_registry_get_chunk(
+                       sessiond_trace_chunk_registry,
+                       conn->session->sessiond_uuid,
+                       conn->session->id,
+                       chunk_id);
+
+       reply = (typeof(reply)) {
+               .generic.ret_code = htobe32((uint32_t) LTTNG_OK),
+               .trace_chunk_exists = !!chunk,
+       };
+       send_ret = conn->sock->ops->sendmsg(conn->sock,
+                       &reply, sizeof(reply), 0);
+       if (send_ret < (ssize_t) sizeof(reply)) {
+               ERR("Failed to send \"create trace chunk\" command reply (ret = %zd)",
+                               send_ret);
+               ret = -1;
+       }
+end_no_reply:
+       lttng_trace_chunk_put(chunk);
+       return ret;
+}
+
 #define DBG_CMD(cmd_name, conn) \
                DBG3("Processing \"%s\" command for socket %i", cmd_name, conn->sock->fd);
 
 #define DBG_CMD(cmd_name, conn) \
                DBG3("Processing \"%s\" command for socket %i", cmd_name, conn->sock->fd);
 
@@ -3079,9 +2685,9 @@ static int relay_process_control_command(struct relay_connection *conn,
                DBG_CMD("RELAYD_RESET_METADATA", conn);
                ret = relay_reset_metadata(header, conn, payload);
                break;
                DBG_CMD("RELAYD_RESET_METADATA", conn);
                ret = relay_reset_metadata(header, conn, payload);
                break;
-       case RELAYD_ROTATE_STREAM:
-               DBG_CMD("RELAYD_ROTATE_STREAM", conn);
-               ret = relay_rotate_session_stream(header, conn, payload);
+       case RELAYD_ROTATE_STREAMS:
+               DBG_CMD("RELAYD_ROTATE_STREAMS", conn);
+               ret = relay_rotate_session_streams(header, conn, payload);
                break;
        case RELAYD_CREATE_TRACE_CHUNK:
                DBG_CMD("RELAYD_CREATE_TRACE_CHUNK", conn);
                break;
        case RELAYD_CREATE_TRACE_CHUNK:
                DBG_CMD("RELAYD_CREATE_TRACE_CHUNK", conn);
@@ -3091,6 +2697,10 @@ static int relay_process_control_command(struct relay_connection *conn,
                DBG_CMD("RELAYD_CLOSE_TRACE_CHUNK", conn);
                ret = relay_close_trace_chunk(header, conn, payload);
                break;
                DBG_CMD("RELAYD_CLOSE_TRACE_CHUNK", conn);
                ret = relay_close_trace_chunk(header, conn, payload);
                break;
+       case RELAYD_TRACE_CHUNK_EXISTS:
+               DBG_CMD("RELAYD_TRACE_CHUNK_EXISTS", conn);
+               ret = relay_trace_chunk_exists(header, conn, payload);
+               break;
        case RELAYD_UPDATE_SYNC_INFO:
        default:
                ERR("Received unknown command (%u)", header->cmd);
        case RELAYD_UPDATE_SYNC_INFO:
        default:
                ERR("Received unknown command (%u)", header->cmd);
@@ -3291,106 +2901,6 @@ static enum relay_connection_status relay_process_control(
        return status;
 }
 
        return status;
 }
 
-/*
- * Handle index for a data stream.
- *
- * Called with the stream lock held.
- *
- * Return 0 on success else a negative value.
- */
-static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
-               bool rotate_index, bool *flushed, uint64_t total_size)
-{
-       int ret = 0;
-       uint64_t data_offset;
-       struct relay_index *index;
-
-       /* Get data offset because we are about to update the index. */
-       data_offset = htobe64(stream->tracefile_size_current);
-
-       DBG("handle_index_data: stream %" PRIu64 " net_seq_num %" PRIu64 " data offset %" PRIu64,
-                       stream->stream_handle, net_seq_num, stream->tracefile_size_current);
-
-       /*
-        * Lookup for an existing index for that stream id/sequence
-        * number. If it exists, the control thread has already received the
-        * data for it, thus we need to write it to disk.
-        */
-       index = relay_index_get_by_id_or_create(stream, net_seq_num);
-       if (!index) {
-               ret = -1;
-               goto end;
-       }
-
-       if (rotate_index || !stream->index_file) {
-               const char *stream_path;
-
-               /*
-                * The data connection creates the stream's first index file.
-                *
-                * This can happen _after_ a ROTATE_STREAM command. In
-                * other words, the data of the first packet of this stream
-                * can be received after a ROTATE_STREAM command.
-                *
-                * The ROTATE_STREAM command changes the stream's path_name
-                * to point to the "next" chunk. If a rotation is pending for
-                * this stream, as indicated by "rotate_at_seq_num != -1ULL",
-                * it means that we are still receiving data that belongs in the
-                * stream's former path.
-                *
-                * In this very specific case, we must ensure that the index
-                * file is created in the streams's former path,
-                * "prev_path_name".
-                *
-                * All other rotations beyond the first one are not affected
-                * by this problem since the actual rotation operation creates
-                * the new chunk's index file.
-                */
-               stream_path = stream->rotate_at_seq_num == -1ULL ?
-                               stream->path_name:
-                               stream->prev_path_name;
-
-               ret = create_rotate_index_file(stream, stream_path);
-               if (ret < 0) {
-                       ERR("Failed to rotate index");
-                       /* Put self-ref for this index due to error. */
-                       relay_index_put(index);
-                       index = NULL;
-                       goto end;
-               }
-       }
-
-       if (relay_index_set_file(index, stream->index_file, data_offset)) {
-               ret = -1;
-               /* Put self-ref for this index due to error. */
-               relay_index_put(index);
-               index = NULL;
-               goto end;
-       }
-
-       ret = relay_index_try_flush(index);
-       if (ret == 0) {
-               tracefile_array_commit_seq(stream->tfa);
-               stream->index_received_seqcount++;
-               *flushed = true;
-       } else if (ret > 0) {
-               index->total_size = total_size;
-               /* No flush. */
-               ret = 0;
-       } else {
-               /*
-                * ret < 0
-                *
-                * relay_index_try_flush is responsible for the self-reference
-                * put of the index object on error.
-                */
-               ERR("relay_index_try_flush error %d", ret);
-               ret = -1;
-       }
-end:
-       return ret;
-}
-
 static enum relay_connection_status relay_process_data_receive_header(
                struct relay_connection *conn)
 {
 static enum relay_connection_status relay_process_data_receive_header(
                struct relay_connection *conn)
 {
@@ -3467,40 +2977,17 @@ static enum relay_connection_status relay_process_data_receive_header(
        }
 
        pthread_mutex_lock(&stream->lock);
        }
 
        pthread_mutex_lock(&stream->lock);
-
-       /* Check if a rotation is needed. */
-       if (stream->tracefile_size > 0 &&
-                       (stream->tracefile_size_current + header.data_size) >
-                       stream->tracefile_size) {
-               uint64_t old_id, new_id;
-
-               old_id = tracefile_array_get_file_index_head(stream->tfa);
-               tracefile_array_file_rotate(stream->tfa);
-
-               /* new_id is updated by utils_rotate_stream_file. */
-               new_id = old_id;
-
-               ret = utils_rotate_stream_file(stream->path_name,
-                               stream->channel_name, stream->tracefile_size,
-                               stream->tracefile_count, -1,
-                               -1, stream->stream_fd->fd,
-                               &new_id, &stream->stream_fd->fd);
-               if (ret < 0) {
-                       ERR("Failed to rotate stream output file");
-                       status = RELAY_CONNECTION_STATUS_ERROR;
-                       goto end_stream_unlock;
-               }
-
-               /*
-                * Reset current size because we just performed a stream
-                * rotation.
-                */
-               stream->tracefile_size_current = 0;
-               conn->protocol.data.state.receive_payload.rotate_index = true;
+       /* Prepare stream for the reception of a new packet. */
+       ret = stream_init_packet(stream, header.data_size,
+                       &conn->protocol.data.state.receive_payload.rotate_index);
+       pthread_mutex_unlock(&stream->lock);
+       if (ret) {
+               ERR("Failed to rotate stream output file");
+               status = RELAY_CONNECTION_STATUS_ERROR;
+               goto end_stream_unlock;
        }
 
 end_stream_unlock:
        }
 
 end_stream_unlock:
-       pthread_mutex_unlock(&stream->lock);
        stream_put(stream);
 end:
        return status;
        stream_put(stream);
 end:
        return status;
@@ -3551,8 +3038,8 @@ static enum relay_connection_status relay_process_data_receive_payload(
         *   - the on-stack data buffer
         */
        while (left_to_receive > 0 && !partial_recv) {
         *   - the on-stack data buffer
         */
        while (left_to_receive > 0 && !partial_recv) {
-               ssize_t write_ret;
                size_t recv_size = min(left_to_receive, chunk_size);
                size_t recv_size = min(left_to_receive, chunk_size);
+               struct lttng_buffer_view packet_chunk;
 
                ret = conn->sock->ops->recvmsg(conn->sock, data_buffer,
                                recv_size, MSG_DONTWAIT);
 
                ret = conn->sock->ops->recvmsg(conn->sock, data_buffer,
                                recv_size, MSG_DONTWAIT);
@@ -3574,14 +3061,15 @@ static enum relay_connection_status relay_process_data_receive_payload(
                         * consumed.
                         */
                        partial_recv = true;
                         * consumed.
                         */
                        partial_recv = true;
+                       recv_size = ret;
                }
 
                }
 
-               recv_size = ret;
+               packet_chunk = lttng_buffer_view_init(data_buffer,
+                               0, recv_size);
+               assert(packet_chunk.data);
 
 
-               /* Write data to stream output fd. */
-               write_ret = lttng_write(stream->stream_fd->fd, data_buffer,
-                               recv_size);
-               if (write_ret < (ssize_t) recv_size) {
+               ret = stream_write(stream, &packet_chunk, 0);
+               if (ret) {
                        ERR("Relay error writing data to file");
                        status = RELAY_CONNECTION_STATUS_ERROR;
                        goto end_stream_unlock;
                        ERR("Relay error writing data to file");
                        status = RELAY_CONNECTION_STATUS_ERROR;
                        goto end_stream_unlock;
@@ -3590,9 +3078,6 @@ static enum relay_connection_status relay_process_data_receive_payload(
                left_to_receive -= recv_size;
                state->received += recv_size;
                state->left_to_receive = left_to_receive;
                left_to_receive -= recv_size;
                state->received += recv_size;
                state->left_to_receive = left_to_receive;
-
-               DBG2("Relay wrote %zd bytes to tracefile for stream id %" PRIu64,
-                               write_ret, stream->stream_handle);
        }
 
        if (state->left_to_receive > 0) {
        }
 
        if (state->left_to_receive > 0) {
@@ -3606,22 +3091,18 @@ static enum relay_connection_status relay_process_data_receive_payload(
                goto end_stream_unlock;
        }
 
                goto end_stream_unlock;
        }
 
-       ret = write_padding_to_file(stream->stream_fd->fd,
-                       state->header.padding_size);
-       if ((int64_t) ret < (int64_t) state->header.padding_size) {
-               ERR("write_padding_to_file: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
-                               stream->stream_handle,
-                               state->header.net_seq_num, ret);
+       ret = stream_write(stream, NULL, state->header.padding_size);
+       if (ret) {
                status = RELAY_CONNECTION_STATUS_ERROR;
                goto end_stream_unlock;
        }
 
                status = RELAY_CONNECTION_STATUS_ERROR;
                goto end_stream_unlock;
        }
 
-
        if (session_streams_have_index(session)) {
        if (session_streams_have_index(session)) {
-               ret = handle_index_data(stream, state->header.net_seq_num,
-                               state->rotate_index, &index_flushed, state->header.data_size + state->header.padding_size);
+               ret = stream_update_index(stream, state->header.net_seq_num,
+                               state->rotate_index, &index_flushed,
+                               state->header.data_size + state->header.padding_size);
                if (ret < 0) {
                if (ret < 0) {
-                       ERR("handle_index_data: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
+                       ERR("Failed to update index: stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
                                        stream->stream_handle,
                                        state->header.net_seq_num, ret);
                        status = RELAY_CONNECTION_STATUS_ERROR;
                                        stream->stream_handle,
                                        state->header.net_seq_num, ret);
                        status = RELAY_CONNECTION_STATUS_ERROR;
@@ -3629,23 +3110,17 @@ static enum relay_connection_status relay_process_data_receive_payload(
                }
        }
 
                }
        }
 
-       stream->tracefile_size_current += state->header.data_size +
-                       state->header.padding_size;
-
        if (stream->prev_data_seq == -1ULL) {
                new_stream = true;
        }
        if (stream->prev_data_seq == -1ULL) {
                new_stream = true;
        }
-       if (index_flushed) {
-               stream->pos_after_last_complete_data_index =
-                               stream->tracefile_size_current;
-               stream->prev_index_seq = state->header.net_seq_num;
-               ret = try_rotate_stream_index(stream);
-               if (ret < 0) {
-                       goto end_stream_unlock;
-               }
-       }
 
 
-       stream->prev_data_seq = state->header.net_seq_num;
+       ret = stream_complete_packet(stream, state->header.data_size +
+                       state->header.padding_size, state->header.net_seq_num,
+                       index_flushed);
+       if (ret) {
+               status = RELAY_CONNECTION_STATUS_ERROR;
+               goto end_stream_unlock;
+       }
 
        /*
         * Resetting the protocol state (to RECEIVE_HEADER) will trash the
 
        /*
         * Resetting the protocol state (to RECEIVE_HEADER) will trash the
@@ -3655,12 +3130,6 @@ static enum relay_connection_status relay_process_data_receive_payload(
        connection_reset_protocol_state(conn);
        state = NULL;
 
        connection_reset_protocol_state(conn);
        state = NULL;
 
-       ret = try_rotate_stream_data(stream);
-       if (ret < 0) {
-               status = RELAY_CONNECTION_STATUS_ERROR;
-               goto end_stream_unlock;
-       }
-
 end_stream_unlock:
        close_requested = stream->close_requested;
        pthread_mutex_unlock(&stream->lock);
 end_stream_unlock:
        close_requested = stream->close_requested;
        pthread_mutex_unlock(&stream->lock);
index fa211fe02bf6633bb60ffbacebdb4b2c95352408..0997fdd6579469098f0db94eb8d31fbfc0ef183c 100644 (file)
@@ -244,11 +244,10 @@ static void destroy_session(struct relay_session *session)
        ret = session_delete(session);
        assert(!ret);
        lttng_trace_chunk_put(session->current_trace_chunk);
        ret = session_delete(session);
        assert(!ret);
        lttng_trace_chunk_put(session->current_trace_chunk);
+       session->current_trace_chunk = NULL;
        ret = sessiond_trace_chunk_registry_session_destroyed(
                        sessiond_trace_chunk_registry, session->sessiond_uuid);
        assert(!ret);
        ret = sessiond_trace_chunk_registry_session_destroyed(
                        sessiond_trace_chunk_registry, session->sessiond_uuid);
        assert(!ret);
-       lttng_trace_chunk_put(session->current_trace_chunk);
-       session->current_trace_chunk = NULL;
        call_rcu(&session->rcu_node, rcu_destroy_session);
 }
 
        call_rcu(&session->rcu_node, rcu_destroy_session);
 }
 
index 31fbb31891dffc03c611bca7489fbb332bcbe281..01d5b3c2d695583f6e6b09ee30ed4aafff2ab623 100644 (file)
@@ -112,7 +112,6 @@ void trace_chunk_registry_ht_element_free(struct rcu_head *node)
        struct trace_chunk_registry_ht_element *element =
                        container_of(node, typeof(*element), rcu_node);
 
        struct trace_chunk_registry_ht_element *element =
                        container_of(node, typeof(*element), rcu_node);
 
-       lttng_trace_chunk_registry_destroy(element->trace_chunk_registry);
        free(element);
 }
 
        free(element);
 }
 
@@ -136,6 +135,7 @@ void trace_chunk_registry_ht_element_release(struct urcu_ref *ref)
                element->sessiond_trace_chunk_registry = NULL;
        }
 
                element->sessiond_trace_chunk_registry = NULL;
        }
 
+       lttng_trace_chunk_registry_destroy(element->trace_chunk_registry);
        /* Defered reclaim of the object */
        call_rcu(&element->rcu_node, trace_chunk_registry_ht_element_free);
 }
        /* Defered reclaim of the object */
        call_rcu(&element->rcu_node, trace_chunk_registry_ht_element_free);
 }
@@ -399,6 +399,19 @@ struct lttng_trace_chunk *sessiond_trace_chunk_registry_publish_chunk(
 
         published_chunk = lttng_trace_chunk_registry_publish_chunk(
                        element->trace_chunk_registry, session_id, new_chunk);
 
         published_chunk = lttng_trace_chunk_registry_publish_chunk(
                        element->trace_chunk_registry, session_id, new_chunk);
+       /*
+        * At this point, two references to the published chunks exist. One
+        * is taken by the registry while the other is being returned to the
+        * caller. In the use case of the relay daemon, the reference held
+        * by the registry itself is undesirable.
+        *
+        * We want the trace chunk to be removed from the registry as soon
+        * as it is not being used by the relay daemon (through a session
+        * or a stream). This differs from the behaviour of the consumer
+        * daemon which relies on an explicit command from the session
+        * daemon to release the registry's reference.
+        */
+       lttng_trace_chunk_put(published_chunk);
 end:
        trace_chunk_registry_ht_element_put(element);
        return published_chunk;
 end:
        trace_chunk_registry_ht_element_put(element);
        return published_chunk;
index 6650700838fc27596e8c8f086cc7815c8a14a098..d11e436a9587019c54920841013fb9fad249088c 100644 (file)
@@ -2,6 +2,7 @@
  * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
  *                      David Goulet <dgoulet@efficios.com>
  *               2015 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
  * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
  *                      David Goulet <dgoulet@efficios.com>
  *               2015 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *               2019 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License, version 2 only, as
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License, version 2 only, as
@@ -21,6 +22,7 @@
 #include <common/common.h>
 #include <common/utils.h>
 #include <common/defaults.h>
 #include <common/common.h>
 #include <common/utils.h>
 #include <common/defaults.h>
+#include <common/sessiond-comm/relayd.h>
 #include <urcu/rculist.h>
 #include <sys/stat.h>
 
 #include <urcu/rculist.h>
 #include <sys/stat.h>
 
@@ -32,6 +34,8 @@
 #include <sys/types.h>
 #include <fcntl.h>
 
 #include <sys/types.h>
 #include <fcntl.h>
 
+#define FILE_IO_STACK_BUFFER_SIZE              65536
+
 /* Should be called with RCU read-side lock held. */
 bool stream_get(struct relay_stream *stream)
 {
 /* Should be called with RCU read-side lock held. */
 bool stream_get(struct relay_stream *stream)
 {
@@ -65,40 +69,196 @@ end:
        return stream;
 }
 
        return stream;
 }
 
-static int stream_create_data_output_file(struct relay_stream *stream)
+static void stream_complete_rotation(struct relay_stream *stream)
+{
+       DBG("Rotation completed for stream %" PRIu64, stream->stream_handle);
+       lttng_trace_chunk_put(stream->trace_chunk);
+       stream->trace_chunk = stream->ongoing_rotation.value.next_trace_chunk;
+       stream->ongoing_rotation = (typeof(stream->ongoing_rotation)) {};
+}
+
+/*
+ * If too much data has been written in a tracefile before we received the
+ * rotation command, we have to move the excess data to the new tracefile and
+ * perform the rotation. This can happen because the control and data
+ * connections are separate, the indexes as well as the commands arrive from
+ * the control connection and we have no control over the order so we could be
+ * in a situation where too much data has been received on the data connection
+ * before the rotation command on the control connection arrives.
+ */
+static int rotate_truncate_stream(struct relay_stream *stream)
+{
+       int ret, new_fd;
+       off_t lseek_ret;
+       uint64_t diff, pos = 0;
+       char buf[FILE_IO_STACK_BUFFER_SIZE];
+
+       assert(!stream->is_metadata);
+
+       assert(stream->tracefile_size_current >
+                       stream->pos_after_last_complete_data_index);
+       diff = stream->tracefile_size_current -
+                       stream->pos_after_last_complete_data_index;
+
+       /* Create the new tracefile. */
+       new_fd = utils_create_stream_file(stream->path_name,
+                       stream->channel_name,
+                       stream->tracefile_size, stream->tracefile_count,
+                       /* uid */ -1, /* gid */ -1, /* suffix */ NULL);
+       if (new_fd < 0) {
+               ERR("Failed to create new stream file at path %s for channel %s",
+                               stream->path_name, stream->channel_name);
+               ret = -1;
+               goto end;
+       }
+
+       /*
+        * Rewind the current tracefile to the position at which the rotation
+        * should have occurred.
+        */
+       lseek_ret = lseek(stream->stream_fd->fd,
+                       stream->pos_after_last_complete_data_index, SEEK_SET);
+       if (lseek_ret < 0) {
+               PERROR("seek truncate stream");
+               ret = -1;
+               goto end;
+       }
+
+       /* Move data from the old file to the new file. */
+       while (pos < diff) {
+               uint64_t count, bytes_left;
+               ssize_t io_ret;
+
+               bytes_left = diff - pos;
+               count = bytes_left > sizeof(buf) ? sizeof(buf) : bytes_left;
+               assert(count <= SIZE_MAX);
+
+               io_ret = lttng_read(stream->stream_fd->fd, buf, count);
+               if (io_ret < (ssize_t) count) {
+                       char error_string[256];
+
+                       snprintf(error_string, sizeof(error_string),
+                                       "Failed to read %" PRIu64 " bytes from fd %i in rotate_truncate_stream(), returned %zi",
+                                       count, stream->stream_fd->fd, io_ret);
+                       if (io_ret == -1) {
+                               PERROR("%s", error_string);
+                       } else {
+                               ERR("%s", error_string);
+                       }
+                       ret = -1;
+                       goto end;
+               }
+
+               io_ret = lttng_write(new_fd, buf, count);
+               if (io_ret < (ssize_t) count) {
+                       char error_string[256];
+
+                       snprintf(error_string, sizeof(error_string),
+                                       "Failed to write %" PRIu64 " bytes from fd %i in rotate_truncate_stream(), returned %zi",
+                                       count, new_fd, io_ret);
+                       if (io_ret == -1) {
+                               PERROR("%s", error_string);
+                       } else {
+                               ERR("%s", error_string);
+                       }
+                       ret = -1;
+                       goto end;
+               }
+
+               pos += count;
+       }
+
+       /* Truncate the file to get rid of the excess data. */
+       ret = ftruncate(stream->stream_fd->fd,
+                       stream->pos_after_last_complete_data_index);
+       if (ret) {
+               PERROR("ftruncate");
+               goto end;
+       }
+
+       ret = close(stream->stream_fd->fd);
+       if (ret < 0) {
+               PERROR("Closing tracefile");
+               goto end;
+       }
+
+       /*
+        * Update the offset and FD of all the eventual indexes created by the
+        * data connection before the rotation command arrived.
+        */
+       ret = relay_index_switch_all_files(stream);
+       if (ret < 0) {
+               ERR("Failed to rotate index file");
+               goto end;
+       }
+
+       stream->stream_fd->fd = new_fd;
+       stream->tracefile_size_current = diff;
+       stream->pos_after_last_complete_data_index = 0;
+       stream_complete_rotation(stream);
+
+       ret = 0;
+
+end:
+       return ret;
+}
+
+static int stream_create_data_output_file_from_trace_chunk(
+               struct relay_stream *stream,
+               struct lttng_trace_chunk *trace_chunk,
+               bool force_unlink,
+               struct stream_fd **out_stream_fd)
 {
        int ret, fd;
 {
        int ret, fd;
+       char stream_path[LTTNG_PATH_MAX];
        enum lttng_trace_chunk_status status;
        const int flags = O_RDWR | O_CREAT | O_TRUNC;
        const mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
        enum lttng_trace_chunk_status status;
        const int flags = O_RDWR | O_CREAT | O_TRUNC;
        const mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
-       char stream_path[LTTNG_PATH_MAX];
 
        ASSERT_LOCKED(stream->lock);
        assert(stream->trace_chunk);
 
 
        ASSERT_LOCKED(stream->lock);
        assert(stream->trace_chunk);
 
-       if (stream->stream_fd) {
-               stream_fd_put(stream->stream_fd);
-               stream->stream_fd = NULL;
-       }
-
        ret = utils_stream_file_path(stream->path_name, stream->channel_name,
        ret = utils_stream_file_path(stream->path_name, stream->channel_name,
-                       stream->tracefile_size, stream->tracefile_count, NULL,
-                       stream_path, sizeof(stream_path));
+                       stream->tracefile_size, stream->tracefile_current_index,
+                       NULL, stream_path, sizeof(stream_path));
        if (ret < 0) {
                goto end;
        }
 
        if (ret < 0) {
                goto end;
        }
 
-       DBG("Opening stream output file \"%s\"", stream_path);
+       if (stream->tracefile_wrapped_around || force_unlink) {
+               /*
+                * The on-disk ring-buffer has wrapped around.
+                * Newly created stream files will replace existing files. Since
+                * live clients may be consuming existing files, the file about
+                * to be replaced is unlinked in order to not overwrite its
+                * content.
+                */
+               status = lttng_trace_chunk_unlink_file(trace_chunk,
+                               stream_path);
+               if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+                       PERROR("Failed to unlink stream file \"%s\" during trace file rotation",
+                                       stream_path);
+                       /*
+                        * Don't abort if the file doesn't exist, it is
+                        * unexpected, but should not be a fatal error.
+                        */
+                       if (errno != ENOENT) {
+                               ret = -1;
+                               goto end;
+                       }
+               }
+       }
+
        status = lttng_trace_chunk_open_file(
        status = lttng_trace_chunk_open_file(
-                       stream->trace_chunk, stream_path, flags, mode, &fd);
+                       trace_chunk, stream_path, flags, mode, &fd);
        if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
                ERR("Failed to open stream file \"%s\"", stream->channel_name);
                ret = -1;
                goto end;
        }
 
        if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
                ERR("Failed to open stream file \"%s\"", stream->channel_name);
                ret = -1;
                goto end;
        }
 
-       stream->stream_fd = stream_fd_create(fd);
-       if (!stream->stream_fd) {
+       *out_stream_fd = stream_fd_create(fd);
+       if (!*out_stream_fd) {
                if (close(ret)) {
                        PERROR("Error closing stream file descriptor %d", ret);
                }
                if (close(ret)) {
                        PERROR("Error closing stream file descriptor %d", ret);
                }
@@ -109,14 +269,222 @@ end:
        return ret;
 }
 
        return ret;
 }
 
+static int stream_rotate_data_file(struct relay_stream *stream)
+{
+       int ret = 0;
+
+       DBG("Rotating stream %" PRIu64 " data file",
+                       stream->stream_handle);
+
+       if (stream->stream_fd) {
+               stream_fd_put(stream->stream_fd);
+               stream->stream_fd = NULL;
+       }
+
+       stream->tracefile_wrapped_around = false;
+       stream->tracefile_current_index = 0;
+
+       if (stream->ongoing_rotation.value.next_trace_chunk) {
+               struct stream_fd *new_stream_fd = NULL;
+               enum lttng_trace_chunk_status chunk_status;
+
+               chunk_status = lttng_trace_chunk_create_subdirectory(
+                               stream->ongoing_rotation.value.next_trace_chunk,
+                               stream->path_name);
+               if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+                       ret = -1;
+                       goto end;
+               }
+
+               /* Rotate the data file. */
+               ret = stream_create_data_output_file_from_trace_chunk(stream,
+                               stream->ongoing_rotation.value.next_trace_chunk,
+                               false, &new_stream_fd);
+               stream->stream_fd = new_stream_fd;
+               if (ret < 0) {
+                       ERR("Failed to rotate stream data file");
+                       goto end;
+               }
+       }
+       stream->tracefile_size_current = 0;
+       stream->pos_after_last_complete_data_index = 0;
+       stream->ongoing_rotation.value.data_rotated = true;
+
+       if (stream->ongoing_rotation.value.index_rotated) {
+               /* Rotation completed; reset its state. */
+               stream_complete_rotation(stream);
+       }
+end:
+       return ret;
+}
+
+/*
+ * Check if a stream's data file (as opposed to index) should be rotated
+ * (for session rotation).
+ * Must be called with the stream lock held.
+ *
+ * Return 0 on success, a negative value on error.
+ */
+static int try_rotate_stream_data(struct relay_stream *stream)
+{
+       int ret = 0;
+
+       if (caa_likely(!stream->ongoing_rotation.is_set)) {
+               /* No rotation expected. */
+               goto end;
+       }
+
+       if (stream->ongoing_rotation.value.data_rotated) {
+               /* Rotation of the data file has already occurred. */
+               goto end;
+       }
+
+       if (stream->prev_data_seq == -1ULL ||
+                       stream->prev_data_seq + 1 < stream->ongoing_rotation.value.seq_num) {
+               /*
+                * The next packet that will be written is not part of the next
+                * chunk yet.
+                */
+               DBG("Stream %" PRIu64 " not yet ready for rotation (rotate_at_seq_num = %" PRIu64
+                               ", prev_data_seq = %" PRIu64 ")",
+                               stream->stream_handle,
+                               stream->ongoing_rotation.value.seq_num,
+                               stream->prev_data_seq);
+               goto end;
+       } else if (stream->prev_data_seq > stream->ongoing_rotation.value.seq_num) {
+               /*
+                * prev_data_seq is checked here since indexes and rotation
+                * commands are serialized with respect to each other.
+                */
+               DBG("Rotation after too much data has been written in tracefile "
+                               "for stream %" PRIu64 ", need to truncate before "
+                               "rotating", stream->stream_handle);
+               ret = rotate_truncate_stream(stream);
+               if (ret) {
+                       ERR("Failed to truncate stream");
+                       goto end;
+               }
+       } else {
+               ret = stream_rotate_data_file(stream);
+       }
+
+end:
+       return ret;
+}
+
+/*
+ * Close the current index file if it is open, and create a new one.
+ *
+ * Return 0 on success, -1 on error.
+ */
+static int create_index_file(struct relay_stream *stream,
+               struct lttng_trace_chunk *chunk)
+{
+       int ret;
+       uint32_t major, minor;
+       char *index_subpath = NULL;
+
+       ASSERT_LOCKED(stream->lock);
+
+       /* Put ref on previous index_file. */
+       if (stream->index_file) {
+               lttng_index_file_put(stream->index_file);
+               stream->index_file = NULL;
+       }
+       major = stream->trace->session->major;
+       minor = stream->trace->session->minor;
+
+       if (!chunk) {
+               ret = 0;
+               goto end;
+       }
+       ret = asprintf(&index_subpath, "%s/%s", stream->path_name,
+                       DEFAULT_INDEX_DIR);
+       if (ret < 0) {
+               goto end;
+       }
+
+       ret = lttng_trace_chunk_create_subdirectory(chunk,
+                       index_subpath);
+       free(index_subpath);
+       if (ret) {
+               goto end;
+       }
+       stream->index_file = lttng_index_file_create_from_trace_chunk(
+                       chunk, stream->path_name,
+                       stream->channel_name, stream->tracefile_size,
+                       stream->tracefile_current_index,
+                       lttng_to_index_major(major, minor),
+                       lttng_to_index_minor(major, minor), true);
+       if (!stream->index_file) {
+               ret = -1;
+               goto end;
+       }
+
+       ret = 0;
+
+end:
+       return ret;
+}
+
+/*
+ * Check if a stream's index file should be rotated (for session rotation).
+ * Must be called with the stream lock held.
+ *
+ * Return 0 on success, a negative value on error.
+ */
+static int try_rotate_stream_index(struct relay_stream *stream)
+{
+       int ret = 0;
+
+       if (!stream->ongoing_rotation.is_set) {
+               /* No rotation expected. */
+               goto end;
+       }
+
+       if (stream->ongoing_rotation.value.index_rotated) {
+               /* Rotation of the index has already occurred. */
+               goto end;
+       }
+
+       if (stream->prev_index_seq == -1ULL ||
+                       stream->prev_index_seq + 1 < stream->ongoing_rotation.value.seq_num) {
+               DBG("Stream %" PRIu64 " index not yet ready for rotation (rotate_at_seq_num = %" PRIu64 ", prev_index_seq = %" PRIu64 ")",
+                               stream->stream_handle,
+                               stream->ongoing_rotation.value.seq_num,
+                               stream->prev_index_seq);
+               goto end;
+       } else {
+               /* The next index belongs to the new trace chunk; rotate. */
+               assert(stream->prev_index_seq + 1 ==
+                               stream->ongoing_rotation.value.seq_num);
+               DBG("Rotating stream %" PRIu64 " index file",
+                               stream->stream_handle);
+               ret = create_index_file(stream,
+                               stream->ongoing_rotation.value.next_trace_chunk);
+               stream->ongoing_rotation.value.index_rotated = true;
+
+               if (stream->ongoing_rotation.value.data_rotated &&
+                               stream->ongoing_rotation.value.index_rotated) {
+                       /* Rotation completed; reset its state. */
+                       DBG("Rotation completed for stream %" PRIu64,
+                                       stream->stream_handle);
+                       stream_complete_rotation(stream);
+               }
+       }
+
+end:
+       return ret;
+}
+
 static int stream_set_trace_chunk(struct relay_stream *stream,
                struct lttng_trace_chunk *chunk)
 {
        int ret = 0;
        enum lttng_trace_chunk_status status;
        bool acquired_reference;
 static int stream_set_trace_chunk(struct relay_stream *stream,
                struct lttng_trace_chunk *chunk)
 {
        int ret = 0;
        enum lttng_trace_chunk_status status;
        bool acquired_reference;
+       struct stream_fd *new_stream_fd = NULL;
 
 
-       pthread_mutex_lock(&stream->lock);
        status = lttng_trace_chunk_create_subdirectory(chunk,
                        stream->path_name);
        if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
        status = lttng_trace_chunk_create_subdirectory(chunk,
                        stream->path_name);
        if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
@@ -128,9 +496,15 @@ static int stream_set_trace_chunk(struct relay_stream *stream,
        acquired_reference = lttng_trace_chunk_get(chunk);
        assert(acquired_reference);
        stream->trace_chunk = chunk;
        acquired_reference = lttng_trace_chunk_get(chunk);
        assert(acquired_reference);
        stream->trace_chunk = chunk;
-       ret = stream_create_data_output_file(stream);
+
+       if (stream->stream_fd) {
+               stream_fd_put(stream->stream_fd);
+               stream->stream_fd = NULL;
+       }
+       ret = stream_create_data_output_file_from_trace_chunk(stream, chunk,
+                       false, &new_stream_fd);
+       stream->stream_fd = new_stream_fd;
 end:
 end:
-       pthread_mutex_unlock(&stream->lock);
        return ret;
 }
 
        return ret;
 }
 
@@ -162,9 +536,7 @@ struct relay_stream *stream_create(struct ctf_trace *trace,
        stream->tracefile_size = tracefile_size;
        stream->tracefile_count = tracefile_count;
        stream->path_name = path_name;
        stream->tracefile_size = tracefile_size;
        stream->tracefile_count = tracefile_count;
        stream->path_name = path_name;
-       stream->prev_path_name = NULL;
        stream->channel_name = channel_name;
        stream->channel_name = channel_name;
-       stream->rotate_at_seq_num = -1ULL;
        stream->beacon_ts_end = -1ULL;
        lttng_ht_node_init_u64(&stream->node, stream->stream_handle);
        pthread_mutex_init(&stream->lock, NULL);
        stream->beacon_ts_end = -1ULL;
        lttng_ht_node_init_u64(&stream->node, stream->stream_handle);
        pthread_mutex_init(&stream->lock, NULL);
@@ -192,7 +564,9 @@ struct relay_stream *stream_create(struct ctf_trace *trace,
                goto end;
        }
 
                goto end;
        }
 
+       pthread_mutex_lock(&stream->lock);
        ret = stream_set_trace_chunk(stream, current_trace_chunk);
        ret = stream_set_trace_chunk(stream, current_trace_chunk);
+       pthread_mutex_unlock(&stream->lock);
        if (ret) {
                ERR("Failed to set the current trace chunk of session \"%s\" on newly created stream of channel \"%s\"",
                                trace->session->session_name,
        if (ret) {
                ERR("Failed to set the current trace chunk of session \"%s\" on newly created stream of channel \"%s\"",
                                trace->session->session_name,
@@ -319,7 +693,6 @@ static void stream_destroy(struct relay_stream *stream)
                tracefile_array_destroy(stream->tfa);
        }
        free(stream->path_name);
                tracefile_array_destroy(stream->tfa);
        }
        free(stream->path_name);
-       free(stream->prev_path_name);
        free(stream->channel_name);
        free(stream);
 }
        free(stream->channel_name);
        free(stream);
 }
@@ -368,6 +741,7 @@ static void stream_release(struct urcu_ref *ref)
                ctf_trace_put(stream->trace);
                stream->trace = NULL;
        }
                ctf_trace_put(stream->trace);
                stream->trace = NULL;
        }
+       stream_complete_rotation(stream);
        lttng_trace_chunk_put(stream->trace_chunk);
        stream->trace_chunk = NULL;
 
        lttng_trace_chunk_put(stream->trace_chunk);
        stream->trace_chunk = NULL;
 
@@ -376,20 +750,63 @@ static void stream_release(struct urcu_ref *ref)
 
 void stream_put(struct relay_stream *stream)
 {
 
 void stream_put(struct relay_stream *stream)
 {
-       DBG("stream put for stream id %" PRIu64, stream->stream_handle);
        rcu_read_lock();
        assert(stream->ref.refcount != 0);
        /*
         * Wait until we have processed all the stream packets before
         * actually putting our last stream reference.
         */
        rcu_read_lock();
        assert(stream->ref.refcount != 0);
        /*
         * Wait until we have processed all the stream packets before
         * actually putting our last stream reference.
         */
-       DBG("stream put stream id %" PRIu64 " refcount %d",
-                       stream->stream_handle,
-                       (int) stream->ref.refcount);
        urcu_ref_put(&stream->ref, stream_release);
        rcu_read_unlock();
 }
 
        urcu_ref_put(&stream->ref, stream_release);
        rcu_read_unlock();
 }
 
+int stream_set_pending_rotation(struct relay_stream *stream,
+               struct lttng_trace_chunk *next_trace_chunk,
+               uint64_t rotation_sequence_number)
+{
+       int ret = 0;
+       const struct relay_stream_rotation rotation = {
+               .seq_num = rotation_sequence_number,
+               .next_trace_chunk = next_trace_chunk,
+       };
+
+       if (stream->ongoing_rotation.is_set) {
+               ERR("Attempted to set a pending rotation on a stream already being rotated (protocol error)");
+               ret = -1;
+               goto end;
+       }
+
+       if (next_trace_chunk) {
+               const bool reference_acquired =
+                               lttng_trace_chunk_get(next_trace_chunk);
+
+               assert(reference_acquired);
+       }
+       LTTNG_OPTIONAL_SET(&stream->ongoing_rotation, rotation);
+
+       DBG("Setting pending rotation: stream_id = %" PRIu64 ", rotation_seq_num = %" PRIu64,
+                       stream->stream_handle, rotation_sequence_number);
+       if (stream->is_metadata) {
+               /*
+                * A metadata stream has no index; consider it already rotated.
+                */
+               stream->ongoing_rotation.value.index_rotated = true;
+               ret = stream_rotate_data_file(stream);
+       } else {
+               ret = try_rotate_stream_data(stream);
+               if (ret < 0) {
+                       goto end;
+               }
+
+               ret = try_rotate_stream_index(stream);
+               if (ret < 0) {
+                       goto end;
+               }
+       }
+end:
+       return ret;
+}
+
 void try_stream_close(struct relay_stream *stream)
 {
        bool session_aborted;
 void try_stream_close(struct relay_stream *stream)
 {
        bool session_aborted;
@@ -478,6 +895,292 @@ void try_stream_close(struct relay_stream *stream)
        stream_put(stream);
 }
 
        stream_put(stream);
 }
 
+int stream_init_packet(struct relay_stream *stream, size_t packet_size,
+               bool *file_rotated)
+{
+       int ret = 0;
+
+       ASSERT_LOCKED(stream->lock);
+       if (caa_likely(stream->tracefile_size == 0)) {
+               /* No size limit set; nothing to check. */
+               goto end;
+       }
+
+       /*
+        * Check if writing the new packet would exceed the maximal file size.
+        */
+       if (caa_unlikely((stream->tracefile_size_current + packet_size) >
+                       stream->tracefile_size)) {
+               const uint64_t new_file_index =
+                               (stream->tracefile_current_index + 1) %
+                               stream->tracefile_count;
+
+               if (new_file_index < stream->tracefile_current_index) {
+                       stream->tracefile_wrapped_around = true;
+               }
+               DBG("New stream packet causes stream file rotation: stream_id = %" PRIu64
+                               ", current_file_size = %" PRIu64
+                               ", packet_size = %" PRIu64 ", current_file_index = %" PRIu64
+                               " new_file_index = %" PRIu64,
+                               stream->stream_handle,
+                               stream->tracefile_size_current, packet_size,
+                               stream->tracefile_current_index, new_file_index);
+               tracefile_array_file_rotate(stream->tfa);
+               stream->tracefile_current_index = new_file_index;
+
+               if (stream->stream_fd) {
+                       stream_fd_put(stream->stream_fd);
+                       stream->stream_fd = NULL;
+               }
+               ret = stream_create_data_output_file_from_trace_chunk(stream,
+                               stream->trace_chunk, false, &stream->stream_fd);
+               if (ret) {
+                       ERR("Failed to perform trace file rotation of stream %" PRIu64,
+                                       stream->stream_handle);
+                       goto end;
+               }
+
+               /*
+                * Reset current size because we just performed a stream
+                * rotation.
+                */
+               stream->tracefile_size_current = 0;
+               *file_rotated = true;
+       } else {
+               *file_rotated = false;
+       }
+end:
+       return ret;
+}
+
+/* Note that the packet is not necessarily complete. */
+int stream_write(struct relay_stream *stream,
+               const struct lttng_buffer_view *packet, size_t padding_len)
+{
+       int ret = 0;
+       ssize_t write_ret;
+       size_t padding_to_write = padding_len;
+       char padding_buffer[FILE_IO_STACK_BUFFER_SIZE];
+
+       ASSERT_LOCKED(stream->lock);
+       memset(padding_buffer, 0,
+                       min(sizeof(padding_buffer), padding_to_write));
+
+       if (packet) {
+               write_ret = lttng_write(stream->stream_fd->fd,
+                               packet->data, packet->size);
+               if (write_ret != packet->size) {
+                       PERROR("Failed to write to stream file of %sstream %" PRIu64,
+                                       stream->is_metadata ? "metadata " : "",
+                                       stream->stream_handle);
+                       ret = -1;
+                       goto end;
+               }
+       }
+
+       while (padding_to_write > 0) {
+               const size_t padding_to_write_this_pass =
+                               min(padding_to_write, sizeof(padding_buffer));
+
+               write_ret = lttng_write(stream->stream_fd->fd,
+                               padding_buffer, padding_to_write_this_pass);
+               if (write_ret != padding_to_write_this_pass) {
+                       PERROR("Failed to write padding to file of %sstream %" PRIu64,
+                                       stream->is_metadata ? "metadata " : "",
+                                       stream->stream_handle);
+                       ret = -1;
+                       goto end;
+               }
+               padding_to_write -= padding_to_write_this_pass;
+       }
+
+       if (stream->is_metadata) {
+               stream->metadata_received += packet->size + padding_len;
+       }
+
+       DBG("Wrote to %sstream %" PRIu64 ": data_length = %" PRIu64 ", padding_length = %" PRIu64,
+                       stream->is_metadata ? "metadata " : "",
+                       stream->stream_handle,
+                       packet ? packet->size : 0, padding_len);
+end:
+       return ret;
+}
+
+/*
+ * Update index after receiving a packet for a data stream.
+ *
+ * Called with the stream lock held.
+ *
+ * Return 0 on success else a negative value.
+ */
+int stream_update_index(struct relay_stream *stream, uint64_t net_seq_num,
+               bool rotate_index, bool *flushed, uint64_t total_size)
+{
+       int ret = 0;
+       uint64_t data_offset;
+       struct relay_index *index;
+
+       ASSERT_LOCKED(stream->lock);
+       /* Get data offset because we are about to update the index. */
+       data_offset = htobe64(stream->tracefile_size_current);
+
+       DBG("handle_index_data: stream %" PRIu64 " net_seq_num %" PRIu64 " data offset %" PRIu64,
+                       stream->stream_handle, net_seq_num, stream->tracefile_size_current);
+
+       /*
+        * Lookup for an existing index for that stream id/sequence
+        * number. If it exists, the control thread has already received the
+        * data for it, thus we need to write it to disk.
+        */
+       index = relay_index_get_by_id_or_create(stream, net_seq_num);
+       if (!index) {
+               ret = -1;
+               goto end;
+       }
+
+       if (rotate_index || !stream->index_file) {
+               ret = create_index_file(stream, stream->trace_chunk);
+               if (ret) {
+                       ERR("Failed to create index file for stream %" PRIu64,
+                                       stream->stream_handle);
+                       /* Put self-ref for this index due to error. */
+                       relay_index_put(index);
+                       index = NULL;
+                       goto end;
+               }
+       }
+
+       if (relay_index_set_file(index, stream->index_file, data_offset)) {
+               ret = -1;
+               /* Put self-ref for this index due to error. */
+               relay_index_put(index);
+               index = NULL;
+               goto end;
+       }
+
+       ret = relay_index_try_flush(index);
+       if (ret == 0) {
+               tracefile_array_commit_seq(stream->tfa);
+               stream->index_received_seqcount++;
+               *flushed = true;
+       } else if (ret > 0) {
+               index->total_size = total_size;
+               /* No flush. */
+               ret = 0;
+       } else {
+               /*
+                * ret < 0
+                *
+                * relay_index_try_flush is responsible for the self-reference
+                * put of the index object on error.
+                */
+               ERR("relay_index_try_flush error %d", ret);
+               ret = -1;
+       }
+end:
+       return ret;
+}
+
+int stream_complete_packet(struct relay_stream *stream, size_t packet_total_size,
+               uint64_t sequence_number, bool index_flushed)
+{
+       int ret = 0;
+
+       ASSERT_LOCKED(stream->lock);
+
+       stream->tracefile_size_current += packet_total_size;
+       if (index_flushed) {
+               stream->pos_after_last_complete_data_index =
+                               stream->tracefile_size_current;
+               stream->prev_index_seq = sequence_number;
+               ret = try_rotate_stream_index(stream);
+               if (ret < 0) {
+                       goto end;
+               }
+       }
+
+       stream->prev_data_seq = sequence_number;
+       ret = try_rotate_stream_data(stream);
+       if (ret < 0) {
+               goto end;
+       }
+end:
+       return ret;
+}
+
+int stream_add_index(struct relay_stream *stream,
+               const struct lttcomm_relayd_index *index_info)
+{
+       int ret = 0;
+       struct relay_index *index;
+
+       ASSERT_LOCKED(stream->lock);
+
+       /* Live beacon handling */
+       if (index_info->packet_size == 0) {
+               DBG("Received live beacon for stream %" PRIu64,
+                               stream->stream_handle);
+
+               /*
+                * Only flag a stream inactive when it has already
+                * received data and no indexes are in flight.
+                */
+               if (stream->index_received_seqcount > 0
+                               && stream->indexes_in_flight == 0) {
+                       stream->beacon_ts_end = index_info->timestamp_end;
+               }
+               ret = 0;
+               goto end;
+       } else {
+               stream->beacon_ts_end = -1ULL;
+       }
+
+       if (stream->ctf_stream_id == -1ULL) {
+               stream->ctf_stream_id = index_info->stream_id;
+       }
+
+       index = relay_index_get_by_id_or_create(stream, index_info->net_seq_num);
+       if (!index) {
+               ret = -1;
+               ERR("Failed to get or create index %" PRIu64,
+                               index_info->net_seq_num);
+               goto end;
+       }
+       if (relay_index_set_control_data(index, index_info,
+                       stream->trace->session->minor)) {
+               ERR("set_index_control_data error");
+               relay_index_put(index);
+               ret = -1;
+               goto end;
+       }
+       ret = relay_index_try_flush(index);
+       if (ret == 0) {
+               tracefile_array_commit_seq(stream->tfa);
+               stream->index_received_seqcount++;
+               stream->pos_after_last_complete_data_index += index->total_size;
+               stream->prev_index_seq = index_info->net_seq_num;
+
+               ret = try_rotate_stream_index(stream);
+               if (ret < 0) {
+                       goto end;
+               }
+       } else if (ret > 0) {
+               /* no flush. */
+               ret = 0;
+       } else {
+               /*
+                * ret < 0
+                *
+                * relay_index_try_flush is responsible for the self-reference
+                * put of the index object on error.
+                */
+               ERR("relay_index_try_flush error %d", ret);
+               ret = -1;
+       }
+end:
+       return ret;
+}
+
 static void print_stream_indexes(struct relay_stream *stream)
 {
        struct lttng_ht_iter iter;
 static void print_stream_indexes(struct relay_stream *stream)
 {
        struct lttng_ht_iter iter;
@@ -499,6 +1202,26 @@ static void print_stream_indexes(struct relay_stream *stream)
        rcu_read_unlock();
 }
 
        rcu_read_unlock();
 }
 
+int stream_reset_file(struct relay_stream *stream)
+{
+       ASSERT_LOCKED(stream->lock);
+
+       if (stream->stream_fd) {
+               stream_fd_put(stream->stream_fd);
+               stream->stream_fd = NULL;
+       }
+
+       stream->tracefile_size_current = 0;
+       stream->prev_data_seq = 0;
+       stream->prev_index_seq = 0;
+       /* Note that this does not reset the tracefile array. */
+       stream->tracefile_current_index = 0;
+       stream->pos_after_last_complete_data_index = 0;
+
+       return stream_create_data_output_file_from_trace_chunk(stream,
+                       stream->trace_chunk, true, &stream->stream_fd);
+}
+
 void print_relay_streams(void)
 {
        struct lttng_ht_iter iter;
 void print_relay_streams(void)
 {
        struct lttng_ht_iter iter;
index dfe15f1a045f28d9bde46a56a50603bf352710a9..b8d14ecfb8cb0d07b40553456bc0fae86a7d5f4e 100644 (file)
 
 #include <common/hashtable/hashtable.h>
 #include <common/trace-chunk.h>
 
 #include <common/hashtable/hashtable.h>
 #include <common/trace-chunk.h>
+#include <common/optional.h>
+#include <common/buffer-view.h>
 
 #include "session.h"
 #include "stream-fd.h"
 #include "tracefile-array.h"
 
 
 #include "session.h"
 #include "stream-fd.h"
 #include "tracefile-array.h"
 
+struct lttcomm_relayd_index;
+
+struct relay_stream_rotation {
+       /*
+        * Indicates if the stream's data and index have been rotated. A
+        * rotation is considered completed when both rotations have occurred.
+        */
+       bool data_rotated;
+       bool index_rotated;
+       /*
+        * Sequence number of the first packet of the new trace chunk to which
+        * the stream is rotating.
+        */
+       uint64_t seq_num;
+       struct lttng_trace_chunk *next_trace_chunk;
+};
+
 /*
  * Represents a stream in the relay
  */
 /*
  * Represents a stream in the relay
  */
@@ -61,26 +80,23 @@ struct relay_stream {
        struct lttng_index_file *index_file;
 
        char *path_name;
        struct lttng_index_file *index_file;
 
        char *path_name;
-       /*
-        * prev_path_name is only used for session rotation support.
-        * It is essentially used to work around the fact that index
-        * files are always created from the 'data' connection.
-        *
-        * Hence, it is possible to receive a ROTATE_STREAM command
-        * which affects the stream's path_name before the creation of
-        * an index file. In this situation, the index file of the
-        * 'previous' chunk would be created in the new destination folder.
-        *
-        * It would then be unlinked when the actual index of the new chunk
-        * is created.
-        */
-       char *prev_path_name;
        char *channel_name;
 
        /* On-disk circular buffer of tracefiles. */
        uint64_t tracefile_size;
        uint64_t tracefile_size_current;
        char *channel_name;
 
        /* On-disk circular buffer of tracefiles. */
        uint64_t tracefile_size;
        uint64_t tracefile_size_current;
+       /* Max number of trace files for this stream. */
        uint64_t tracefile_count;
        uint64_t tracefile_count;
+       /*
+        * Index of the currently active file for this stream's on-disk
+        * ring buffer.
+        */
+       uint64_t tracefile_current_index;
+       /*
+        * Indicates that the on-disk buffer has wrapped around. Stream
+        * files shall be unlinked before being opened after this has occurred.
+        */
+       bool tracefile_wrapped_around;
 
        /*
         * Position in the tracefile where we have the full index also on disk.
 
        /*
         * Position in the tracefile where we have the full index also on disk.
@@ -146,7 +162,8 @@ struct relay_stream {
         */
        bool in_recv_list;
        struct cds_list_head recv_node;
         */
        bool in_recv_list;
        struct cds_list_head recv_node;
-       bool published; /* Protected by session lock. */
+       /* Protected by session lock. */
+       bool published;
        /*
         * Node of stream within global stream hash table.
         */
        /*
         * Node of stream within global stream hash table.
         */
@@ -154,27 +171,11 @@ struct relay_stream {
        bool in_stream_ht;              /* is stream in stream hash table. */
        struct rcu_head rcu_node;       /* For call_rcu teardown. */
        /*
        bool in_stream_ht;              /* is stream in stream hash table. */
        struct rcu_head rcu_node;       /* For call_rcu teardown. */
        /*
-        * When we have written the data and index corresponding to this
-        * seq_num, rotate the tracefile (session rotation). The path_name is
-        * already up-to-date.
-        * This is set to -1ULL when no rotation is pending.
-        *
-        * Always access with stream lock held.
-        */
-       uint64_t rotate_at_seq_num;
-       /*
-        * When rotate_at_seq_num != -1ULL, meaning that a rotation is ongoing,
-        * data_rotated and index_rotated respectively indicate if the stream's
-        * data and index have been rotated. A rotation is considered completed
-        * when both rotations have occurred.
-        */
-       bool data_rotated;
-       bool index_rotated;
-       /*
-        * `trace_chunk` is the trace chunk to which the file currently
-        * being produced (if any) belongs.
+        * The trace chunk to which the file currently being produced (if any)
+        * belongs.
         */
        struct lttng_trace_chunk *trace_chunk;
         */
        struct lttng_trace_chunk *trace_chunk;
+       LTTNG_OPTIONAL(struct relay_stream_rotation) ongoing_rotation;
 };
 
 struct relay_stream *stream_create(struct ctf_trace *trace,
 };
 
 struct relay_stream *stream_create(struct ctf_trace *trace,
@@ -185,8 +186,28 @@ struct relay_stream *stream_create(struct ctf_trace *trace,
 struct relay_stream *stream_get_by_id(uint64_t stream_id);
 bool stream_get(struct relay_stream *stream);
 void stream_put(struct relay_stream *stream);
 struct relay_stream *stream_get_by_id(uint64_t stream_id);
 bool stream_get(struct relay_stream *stream);
 void stream_put(struct relay_stream *stream);
+int stream_rotate_output_files(struct relay_session *session,
+               struct relay_stream *stream);
+int stream_set_pending_rotation(struct relay_stream *stream,
+               struct lttng_trace_chunk *next_trace_chunk,
+               uint64_t rotation_sequence_number);
 void try_stream_close(struct relay_stream *stream);
 void stream_publish(struct relay_stream *stream);
 void try_stream_close(struct relay_stream *stream);
 void stream_publish(struct relay_stream *stream);
+int stream_init_packet(struct relay_stream *stream, size_t packet_size,
+               bool *file_rotated);
+int stream_write(struct relay_stream *stream,
+               const struct lttng_buffer_view *packet, size_t padding_len);
+/* Called after the reception of a complete data packet. */
+int stream_update_index(struct relay_stream *stream, uint64_t net_seq_num,
+               bool rotate_index, bool *flushed, uint64_t total_size);
+int stream_complete_packet(struct relay_stream *stream,
+               size_t packet_total_size, uint64_t sequence_number,
+               bool index_flushed);
+/* Index info is in host endianness. */
+int stream_add_index(struct relay_stream *stream,
+               const struct lttcomm_relayd_index *index_info);
+int stream_reset_file(struct relay_stream *stream);
+
 void print_relay_streams(void);
 
 #endif /* _STREAM_H */
 void print_relay_streams(void);
 
 #endif /* _STREAM_H */
index 3f35f91655b58b6da7287b476293f8504088776b..d6fb7037b663396a190615772e7860f319eabaff 100644 (file)
@@ -151,6 +151,7 @@ void lttng_directory_handle_fini(struct lttng_directory_handle *handle)
        ret = close(handle->dirfd);
        if (ret == -1) {
                PERROR("Failed to close directory file descriptor of directory handle");
        ret = close(handle->dirfd);
        if (ret == -1) {
                PERROR("Failed to close directory file descriptor of directory handle");
+               abort();
        }
 end:
        lttng_directory_handle_invalidate(handle);
        }
 end:
        lttng_directory_handle_invalidate(handle);
index 8bda682bd9bdd4620e4af359b5365c919f61d636..44a5b0fc1efa42505c03af8054b12212c86687e6 100644 (file)
@@ -78,7 +78,7 @@ int consumer_stream_sync_metadata(struct lttng_consumer_local_data *ctx,
                uint64_t session_id);
 
 /*
                uint64_t session_id);
 
 /*
- * Create the  output files of a local stream.
+ * Create the output files of a local stream.
  *
  * This must be called with the channel's and the stream's lock held.
  */
  *
  * This must be called with the channel's and the stream's lock held.
  */
index 948a81d804428a01582b0466cb2e7433eb87dc4e..b2f4c2686d2f7f81b5bc21c4249a681e92a5bad3 100644 (file)
@@ -52,6 +52,7 @@
 #include <common/trace-chunk.h>
 #include <common/trace-chunk-registry.h>
 #include <common/string-utils/format.h>
 #include <common/trace-chunk.h>
 #include <common/trace-chunk-registry.h>
 #include <common/string-utils/format.h>
+#include <common/dynamic-array.h>
 
 struct lttng_consumer_global_data consumer_data = {
        .stream_count = 0,
 
 struct lttng_consumer_global_data consumer_data = {
        .stream_count = 0,
@@ -936,6 +937,7 @@ static int write_relayd_stream_header(struct lttng_consumer_stream *stream,
                data_hdr.stream_id = htobe64(stream->relayd_stream_id);
                data_hdr.data_size = htobe32(data_size);
                data_hdr.padding_size = htobe32(padding);
                data_hdr.stream_id = htobe64(stream->relayd_stream_id);
                data_hdr.data_size = htobe32(data_size);
                data_hdr.padding_size = htobe32(padding);
+
                /*
                 * Note that net_seq_num below is assigned with the *current* value of
                 * next_net_seq_num and only after that the next_net_seq_num will be
                /*
                 * Note that net_seq_num below is assigned with the *current* value of
                 * next_net_seq_num and only after that the next_net_seq_num will be
@@ -4012,12 +4014,28 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
        struct lttng_consumer_stream *stream;
        struct lttng_ht_iter iter;
        struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
        struct lttng_consumer_stream *stream;
        struct lttng_ht_iter iter;
        struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
+       struct lttng_dynamic_array stream_rotation_positions;
+       uint64_t next_chunk_id, stream_count = 0;
+       enum lttng_trace_chunk_status chunk_status;
+       const bool is_local_trace = relayd_id == -1ULL;
+       struct consumer_relayd_sock_pair *relayd = NULL;
+       bool rotating_to_new_chunk = true;
 
        DBG("Consumer sample rotate position for channel %" PRIu64, key);
 
 
        DBG("Consumer sample rotate position for channel %" PRIu64, key);
 
+       lttng_dynamic_array_init(&stream_rotation_positions,
+                       sizeof(struct relayd_stream_rotation_position), NULL);
+
        rcu_read_lock();
 
        pthread_mutex_lock(&channel->lock);
        rcu_read_lock();
 
        pthread_mutex_lock(&channel->lock);
+       assert(channel->trace_chunk);
+       chunk_status = lttng_trace_chunk_get_id(channel->trace_chunk,
+                       &next_chunk_id);
+       if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+               ret = -1;
+               goto end_unlock_channel;
+       }
 
        cds_lfht_for_each_entry_duplicate(ht->ht,
                        ht->hash_fct(&channel->key, lttng_ht_seed),
 
        cds_lfht_for_each_entry_duplicate(ht->ht,
                        ht->hash_fct(&channel->key, lttng_ht_seed),
@@ -4032,6 +4050,10 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                 */
                pthread_mutex_lock(&stream->lock);
 
                 */
                pthread_mutex_lock(&stream->lock);
 
+               if (stream->trace_chunk == stream->chan->trace_chunk) {
+                       rotating_to_new_chunk = false;
+               }
+
                ret = lttng_consumer_sample_snapshot_positions(stream);
                if (ret < 0) {
                        ERR("Failed to sample snapshot position during channel rotation");
                ret = lttng_consumer_sample_snapshot_positions(stream);
                if (ret < 0) {
                        ERR("Failed to sample snapshot position during channel rotation");
@@ -4058,18 +4080,62 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                        goto end_unlock_stream;
                }
 
                        goto end_unlock_stream;
                }
 
+               if (!is_local_trace) {
+                       const struct relayd_stream_rotation_position position = {
+                               .stream_id = stream->relayd_stream_id,
+                               .rotate_at_seq_num = (stream->rotate_position /
+                                               stream->max_sb_size) + 1,
+                       };
+
+                       ret = lttng_dynamic_array_add_element(
+                                       &stream_rotation_positions,
+                                       &position);
+                       if (ret) {
+                               ERR("Failed to allocate stream rotation position");
+                               goto end_unlock_stream;
+                       }
+                       stream_count++;
+               }
                pthread_mutex_unlock(&stream->lock);
        }
                pthread_mutex_unlock(&stream->lock);
        }
+       stream = NULL;
        pthread_mutex_unlock(&channel->lock);
 
        pthread_mutex_unlock(&channel->lock);
 
+       if (is_local_trace) {
+               ret = 0;
+               goto end;
+       }
+
+       relayd = consumer_find_relayd(relayd_id);
+       if (!relayd) {
+               ERR("Failed to find relayd %" PRIu64, relayd_id);
+               ret = -1;
+               goto end;
+       }
+
+       pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+       ret = relayd_rotate_streams(&relayd->control_sock, stream_count,
+                       rotating_to_new_chunk ? &next_chunk_id : NULL,
+                       (const struct relayd_stream_rotation_position *)
+                                       stream_rotation_positions.buffer.data);
+       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+       if (ret < 0) {
+               ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64,
+                               relayd->net_seq_idx);
+               lttng_consumer_cleanup_relayd(relayd);
+               goto end;
+       }
+
        ret = 0;
        goto end;
 
 end_unlock_stream:
        pthread_mutex_unlock(&stream->lock);
        ret = 0;
        goto end;
 
 end_unlock_stream:
        pthread_mutex_unlock(&stream->lock);
+end_unlock_channel:
        pthread_mutex_unlock(&channel->lock);
 end:
        rcu_read_unlock();
        pthread_mutex_unlock(&channel->lock);
 end:
        rcu_read_unlock();
+       lttng_dynamic_array_reset(&stream_rotation_positions);
        return ret;
 }
 
        return ret;
 }
 
@@ -4168,52 +4234,6 @@ end:
        return ret;
 }
 
        return ret;
 }
 
-/*
- * Perform the rotation a stream file on the relay.
- */
-int rotate_relay_stream(struct lttng_consumer_local_data *ctx,
-               struct lttng_consumer_stream *stream)
-{
-       int ret;
-       struct consumer_relayd_sock_pair *relayd;
-       uint64_t chunk_id;
-       enum lttng_trace_chunk_status chunk_status;
-
-       DBG("Rotate relay stream");
-       relayd = consumer_find_relayd(stream->net_seq_idx);
-       if (!relayd) {
-               ERR("Failed to find relayd");
-               ret = -1;
-               goto end;
-       }
-
-       chunk_status = lttng_trace_chunk_get_id(stream->chan->trace_chunk,
-                       &chunk_id);
-       if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
-               ERR("Failed to retrieve the id of the current trace chunk of channel \"%s\"",
-                               stream->chan->name);
-               ret = -1;
-               goto end;
-       }
-
-       pthread_mutex_lock(&relayd->ctrl_sock_mutex);
-       ret = relayd_rotate_stream(&relayd->control_sock,
-                       stream->relayd_stream_id,
-                       chunk_id,
-                       stream->last_sequence_number);
-       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
-       if (ret < 0) {
-               ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
-               lttng_consumer_cleanup_relayd(relayd);
-       }
-       if (ret) {
-               ERR("Rotate relay stream");
-       }
-
-end:
-       return ret;
-}
-
 /*
  * Performs the stream rotation for the rotate session feature if needed.
  * It must be called with the channel and stream locks held.
 /*
  * Performs the stream rotation for the rotate session feature if needed.
  * It must be called with the channel and stream locks held.
@@ -4255,14 +4275,12 @@ int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
                stream->trace_chunk = stream->chan->trace_chunk;
        }
 
                stream->trace_chunk = stream->chan->trace_chunk;
        }
 
-       if (stream->net_seq_idx != (uint64_t) -1ULL) {
-               ret = rotate_relay_stream(ctx, stream);
-       } else {
+       if (stream->net_seq_idx == (uint64_t) -1ULL) {
                ret = rotate_local_stream(ctx, stream);
                ret = rotate_local_stream(ctx, stream);
-       }
-       if (ret < 0) {
-               ERR("Failed to rotate stream, ret = %i", ret);
-               goto error;
+               if (ret < 0) {
+                       ERR("Failed to rotate stream, ret = %i", ret);
+                       goto error;
+               }
        }
 
        if (stream->metadata_flag && stream->trace_chunk) {
        }
 
        if (stream->metadata_flag && stream->trace_chunk) {
@@ -4678,10 +4696,14 @@ enum lttcomm_return_code lttng_consumer_trace_chunk_exists(
                const uint64_t *relayd_id, uint64_t session_id,
                uint64_t chunk_id)
 {
                const uint64_t *relayd_id, uint64_t session_id,
                uint64_t chunk_id)
 {
+       int ret;
        enum lttcomm_return_code ret_code;
        struct lttng_trace_chunk *chunk;
        char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
        const char *relayd_id_str = "(none)";
        enum lttcomm_return_code ret_code;
        struct lttng_trace_chunk *chunk;
        char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
        const char *relayd_id_str = "(none)";
+       const bool is_local_trace = !relayd_id;
+       struct consumer_relayd_sock_pair *relayd = NULL;
+       bool chunk_exists_remote;
 
        if (relayd_id) {
                int ret;
 
        if (relayd_id) {
                int ret;
@@ -4697,16 +4719,47 @@ enum lttcomm_return_code lttng_consumer_trace_chunk_exists(
         }
 
        DBG("Consumer trace chunk exists command: relayd_id = %s"
         }
 
        DBG("Consumer trace chunk exists command: relayd_id = %s"
-                       ", session_id = %" PRIu64
                        ", chunk_id = %" PRIu64, relayd_id_str,
                        ", chunk_id = %" PRIu64, relayd_id_str,
-                       session_id, chunk_id);
+                       chunk_id);
        chunk = lttng_trace_chunk_registry_find_chunk(
                        consumer_data.chunk_registry, session_id,
                        chunk_id);
        DBG("Trace chunk %s locally", chunk ? "exists" : "does not exist");
        chunk = lttng_trace_chunk_registry_find_chunk(
                        consumer_data.chunk_registry, session_id,
                        chunk_id);
        DBG("Trace chunk %s locally", chunk ? "exists" : "does not exist");
-       ret_code = chunk ? LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_LOCAL :
+       if (chunk) {
+               ret_code = LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_LOCAL;
+               lttng_trace_chunk_put(chunk);
+               goto end;
+       } else if (is_local_trace) {
+               ret_code = LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
+               goto end;
+       }
+
+       rcu_read_lock();
+       relayd = consumer_find_relayd(*relayd_id);
+       if (!relayd) {
+               ERR("Failed to find relayd %" PRIu64, *relayd_id);
+               ret_code = LTTCOMM_CONSUMERD_INVALID_PARAMETERS;
+               goto end_rcu_unlock;
+       }
+       DBG("Looking up existence of trace chunk on relay daemon");
+       pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+       ret = relayd_trace_chunk_exists(&relayd->control_sock, chunk_id,
+                       &chunk_exists_remote);
+       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+       if (ret < 0) {
+               ERR("Failed to look-up the existence of trace chunk on relay daemon");
+               ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
+               goto end_rcu_unlock;
+       }
+
+       ret_code = chunk_exists_remote ?
+                       LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_REMOTE :
                        LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
                        LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
+       DBG("Trace chunk %s on relay daemon",
+                       chunk_exists_remote ? "exists" : "does not exist");
 
 
-       lttng_trace_chunk_put(chunk);
+end_rcu_unlock:
+       rcu_read_unlock();
+end:
        return ret_code;
 }
        return ret_code;
 }
index 599ce451c00580a042049d468d30b26be99561cf..26492c0e4a49f229a958b86a374cfb4938d72f7f 100644 (file)
@@ -313,7 +313,7 @@ struct lttng_consumer_stream {
        bool missed_metadata_flush;
 
        enum lttng_event_output output;
        bool missed_metadata_flush;
 
        enum lttng_event_output output;
-       /* Maximum subbuffer size. */
+       /* Maximum subbuffer size (in bytes). */
        unsigned long max_sb_size;
 
        /*
        unsigned long max_sb_size;
 
        /*
index a87c53a89d90fc1ec91dcb9f6ca6a2b48a779002..f79827ab277c279bc6caa7ac2b59a03dfbef1416 100644 (file)
@@ -218,6 +218,7 @@ static const char *error_string_array[] = {
        [ ERROR_INDEX(LTTNG_ERR_CLOSE_TRACE_CHUNK_FAIL_CONSUMER) ] = "Trace chunk close failed on consumer",
        [ ERROR_INDEX(LTTNG_ERR_TRACE_CHUNK_EXISTS_FAIL_CONSUMER) ] = "Failed to query consumer for trace chunk existence",
        [ ERROR_INDEX(LTTNG_ERR_INVALID_PROTOCOL) ] = "Protocol error occurred",
        [ ERROR_INDEX(LTTNG_ERR_CLOSE_TRACE_CHUNK_FAIL_CONSUMER) ] = "Trace chunk close failed on consumer",
        [ ERROR_INDEX(LTTNG_ERR_TRACE_CHUNK_EXISTS_FAIL_CONSUMER) ] = "Failed to query consumer for trace chunk existence",
        [ ERROR_INDEX(LTTNG_ERR_INVALID_PROTOCOL) ] = "Protocol error occurred",
+       [ ERROR_INDEX(LTTNG_ERR_FILE_CREATION_ERROR) ] = "Failed to create file",
 
        /* Last element */
        [ ERROR_INDEX(LTTNG_ERR_NR) ] = "Unknown error code"
 
        /* Last element */
        [ ERROR_INDEX(LTTNG_ERR_NR) ] = "Unknown error code"
index f7e4e9054258a1febfacb8b161386e4942c7124d..5a6fe3d8608bf56def8212199dcaaac540def172 100644 (file)
@@ -34,7 +34,7 @@
 struct lttng_index_file *lttng_index_file_create_from_trace_chunk(
                struct lttng_trace_chunk *chunk,
                const char *channel_path, char *stream_name,
 struct lttng_index_file *lttng_index_file_create_from_trace_chunk(
                struct lttng_trace_chunk *chunk,
                const char *channel_path, char *stream_name,
-               uint64_t stream_file_size, uint64_t stream_count,
+               uint64_t stream_file_size, uint64_t stream_file_index,
                uint32_t index_major, uint32_t index_minor,
                bool unlink_existing_file)
 {
                uint32_t index_major, uint32_t index_minor,
                bool unlink_existing_file)
 {
@@ -49,6 +49,9 @@ struct lttng_index_file *lttng_index_file_create_from_trace_chunk(
                        index_minor);
        const int flags = O_WRONLY | O_CREAT | O_TRUNC;
        const mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
                        index_minor);
        const int flags = O_WRONLY | O_CREAT | O_TRUNC;
        const mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
+       bool acquired_reference = lttng_trace_chunk_get(chunk);
+
+       assert(acquired_reference);
 
        index_file = zmalloc(sizeof(*index_file));
        if (!index_file) {
 
        index_file = zmalloc(sizeof(*index_file));
        if (!index_file) {
@@ -56,6 +59,7 @@ struct lttng_index_file *lttng_index_file_create_from_trace_chunk(
                goto error;
        }
 
                goto error;
        }
 
+       index_file->trace_chunk = chunk;
        ret = snprintf(index_directory_path, sizeof(index_directory_path),
                        "%s/" DEFAULT_INDEX_DIR, channel_path);
        if (ret < 0 || ret >= sizeof(index_directory_path)) {
        ret = snprintf(index_directory_path, sizeof(index_directory_path),
                        "%s/" DEFAULT_INDEX_DIR, channel_path);
        if (ret < 0 || ret >= sizeof(index_directory_path)) {
@@ -64,7 +68,7 @@ struct lttng_index_file *lttng_index_file_create_from_trace_chunk(
        }
 
        ret = utils_stream_file_path(index_directory_path, stream_name,
        }
 
        ret = utils_stream_file_path(index_directory_path, stream_name,
-                       stream_file_size, stream_count,
+                       stream_file_size, stream_file_index,
                        DEFAULT_INDEX_FILE_SUFFIX,
                        index_file_path, sizeof(index_file_path));
        if (ret) {
                        DEFAULT_INDEX_FILE_SUFFIX,
                        index_file_path, sizeof(index_file_path));
        if (ret) {
@@ -290,6 +294,7 @@ static void lttng_index_file_release(struct urcu_ref *ref)
        if (close(index_file->fd)) {
                PERROR("close index fd");
        }
        if (close(index_file->fd)) {
                PERROR("close index fd");
        }
+       lttng_trace_chunk_put(index_file->trace_chunk);
        free(index_file);
 }
 
        free(index_file);
 }
 
index a999a83d9b74aef5bcc7ca79d0b2185db39127db..b5ffc314998b483acbb9f0df47da5e2fcef10a45 100644 (file)
@@ -31,6 +31,7 @@ struct lttng_index_file {
        uint32_t major;
        uint32_t minor;
        uint32_t element_len;
        uint32_t major;
        uint32_t minor;
        uint32_t element_len;
+       struct lttng_trace_chunk *trace_chunk;
        struct urcu_ref ref;
 };
 
        struct urcu_ref ref;
 };
 
index 91cbf762f90989809aaafd17623bee095492f4f9..363a0e7b6dc02f8214d92175f64f4d06d48aa5b8 100644 (file)
@@ -30,6 +30,7 @@
 #include <common/sessiond-comm/relayd.h>
 #include <common/index/ctf-index.h>
 #include <common/trace-chunk.h>
 #include <common/sessiond-comm/relayd.h>
 #include <common/index/ctf-index.h>
 #include <common/trace-chunk.h>
+#include <common/string-utils/format.h>
 
 #include "relayd.h"
 
 
 #include "relayd.h"
 
@@ -1121,82 +1122,97 @@ error:
        return ret;
 }
 
        return ret;
 }
 
-int relayd_rotate_stream(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
-               uint64_t new_chunk_id, uint64_t seq_num)
+int relayd_rotate_streams(struct lttcomm_relayd_sock *sock,
+               unsigned int stream_count, uint64_t *new_chunk_id,
+               const struct relayd_stream_rotation_position *positions)
 {
        int ret;
 {
        int ret;
-       struct lttcomm_relayd_rotate_stream *msg = NULL;
-       struct lttcomm_relayd_generic_reply reply;
-       size_t len;
-       int msg_len;
-       /* FIXME */
-       char *new_pathname = NULL;
+       unsigned int i;
+       struct lttng_dynamic_buffer payload;
+       struct lttcomm_relayd_generic_reply reply = {};
+       const struct lttcomm_relayd_rotate_streams msg = {
+               .stream_count = htobe32((uint32_t) stream_count),
+               .new_chunk_id = (typeof(msg.new_chunk_id)) {
+                       .is_set = !!new_chunk_id,
+                       .value = htobe64(new_chunk_id ? *new_chunk_id : 0),
+               },
+       };
+       char new_chunk_id_buf[MAX_INT_DEC_LEN(*new_chunk_id)] = {};
+       const char *new_chunk_id_str;
 
 
-       /* Code flow error. Safety net. */
-       assert(rsock);
+       lttng_dynamic_buffer_init(&payload);
 
 
-       DBG("Sending rotate stream id %" PRIu64 " command to relayd", stream_id);
+       /* Code flow error. Safety net. */
+       assert(sock);
 
 
-       /* Account for the trailing NULL. */
-       len = lttng_strnlen(new_pathname, LTTNG_PATH_MAX) + 1;
-       if (len > LTTNG_PATH_MAX) {
-               ERR("Path used in relayd rotate stream command exceeds the maximal allowed length");
-               ret = -1;
-               goto error;
+       if (new_chunk_id) {
+               ret = snprintf(new_chunk_id_buf, sizeof(new_chunk_id_buf),
+                               "%" PRIu64, *new_chunk_id);
+               if (ret == -1 || ret >= sizeof(new_chunk_id_buf)) {
+                       new_chunk_id_str = "formatting error";
+               } else {
+                       new_chunk_id_str = new_chunk_id_buf;
+               }
+       } else {
+               new_chunk_id_str = "none";
        }
 
        }
 
-       msg_len = offsetof(struct lttcomm_relayd_rotate_stream, new_pathname) + len;
-       msg = zmalloc(msg_len);
-       if (!msg) {
-               PERROR("Failed to allocate relayd rotate stream command of %d bytes",
-                               msg_len);
-               ret = -1;
-               goto error;
-       }
+       DBG("Preparing \"rotate streams\" command payload: new_chunk_id = %s, stream_count = %u",
+                       new_chunk_id_str, stream_count);
 
 
-       if (lttng_strncpy(msg->new_pathname, new_pathname, len)) {
-               ret = -1;
-               ERR("Failed to copy relayd rotate stream command's new path name");
+       ret = lttng_dynamic_buffer_append(&payload, &msg, sizeof(msg));
+       if (ret) {
+               ERR("Failed to allocate \"rotate streams\" command payload");
                goto error;
        }
 
                goto error;
        }
 
-       msg->pathname_length = htobe32(len);
-       msg->stream_id = htobe64(stream_id);
-       msg->new_chunk_id = htobe64(new_chunk_id);
-       /*
-        * The seq_num is invalid for metadata streams, but it is ignored on
-        * the relay.
-        */
-       msg->rotate_at_seq_num = htobe64(seq_num);
+       for (i = 0; i < stream_count; i++) {
+               const struct relayd_stream_rotation_position *position =
+                               &positions[i];
+               const struct lttcomm_relayd_stream_rotation_position comm_position = {
+                       .stream_id = htobe64(position->stream_id),
+                       .rotate_at_seq_num = htobe64(
+                                       position->rotate_at_seq_num),
+               };
+
+               DBG("Rotate stream %" PRIu64 "at sequence number %" PRIu64,
+                               position->stream_id,
+                               position->rotate_at_seq_num);
+               ret = lttng_dynamic_buffer_append(&payload, &comm_position,
+                               sizeof(comm_position));
+               if (ret) {
+                       ERR("Failed to allocate \"rotate streams\" command payload");
+                       goto error;
+               }
+       }
 
        /* Send command. */
 
        /* Send command. */
-       ret = send_command(rsock, RELAYD_ROTATE_STREAM, (void *) msg, msg_len, 0);
+       ret = send_command(sock, RELAYD_ROTATE_STREAMS, payload.data,
+                       payload.size, 0);
        if (ret < 0) {
        if (ret < 0) {
-               ERR("Send rotate command");
+               ERR("Failed to send \"rotate stream\" command");
                goto error;
        }
 
        /* Receive response. */
                goto error;
        }
 
        /* Receive response. */
-       ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
+       ret = recv_reply(sock, &reply, sizeof(reply));
        if (ret < 0) {
        if (ret < 0) {
-               ERR("Receive rotate reply");
+               ERR("Failed to receive \"rotate streams\" command reply");
                goto error;
        }
 
        reply.ret_code = be32toh(reply.ret_code);
                goto error;
        }
 
        reply.ret_code = be32toh(reply.ret_code);
-
-       /* Return session id or negative ret code. */
        if (reply.ret_code != LTTNG_OK) {
                ret = -1;
        if (reply.ret_code != LTTNG_OK) {
                ret = -1;
-               ERR("Relayd rotate stream replied error %d", reply.ret_code);
+               ERR("Relayd rotate streams replied error %d", reply.ret_code);
        } else {
                /* Success. */
                ret = 0;
        } else {
                /* Success. */
                ret = 0;
-               DBG("Relayd rotated stream id %" PRIu64 " successfully", stream_id);
+               DBG("Relayd rotated streams successfully");
        }
 
 error:
        }
 
 error:
-       free(msg);
+       lttng_dynamic_buffer_reset(&payload);
        return ret;
 }
 
        return ret;
 }
 
@@ -1359,3 +1375,43 @@ int relayd_close_trace_chunk(struct lttcomm_relayd_sock *sock,
 end:
        return ret;
 }
 end:
        return ret;
 }
+
+int relayd_trace_chunk_exists(struct lttcomm_relayd_sock *sock,
+               uint64_t chunk_id, bool *chunk_exists)
+{
+       int ret = 0;
+       struct lttcomm_relayd_trace_chunk_exists msg = {};
+       struct lttcomm_relayd_trace_chunk_exists_reply reply = {};
+
+       msg = (typeof(msg)){
+                       .chunk_id = htobe64(chunk_id),
+       };
+
+       ret = send_command(sock, RELAYD_TRACE_CHUNK_EXISTS, &msg, sizeof(msg),
+                       0);
+       if (ret < 0) {
+               ERR("Failed to send trace chunk exists command to relay daemon");
+               goto end;
+       }
+
+       ret = recv_reply(sock, &reply, sizeof(reply));
+       if (ret < 0) {
+               ERR("Failed to receive relay daemon trace chunk close command reply");
+               goto end;
+       }
+
+       reply.generic.ret_code = be32toh(reply.generic.ret_code);
+       if (reply.generic.ret_code != LTTNG_OK) {
+               ret = -1;
+               ERR("Relayd trace chunk close replied error %d",
+                               reply.generic.ret_code);
+       } else {
+               ret = 0;
+               DBG("Relayd successfully checked trace chunk existence: chunk_id = %" PRIu64
+                               ", exists = %s", chunk_id,
+                               reply.trace_chunk_exists ? "true" : "false");
+               *chunk_exists = !!reply.trace_chunk_exists;
+       }
+end:
+       return ret;
+}
index 033189bf14b6802075494c8329273f4faaca5335..3448095e54bb27bdc3a7d5a85df3a7132ee0b6aa 100644 (file)
 #define _RELAYD_H
 
 #include <unistd.h>
 #define _RELAYD_H
 
 #include <unistd.h>
+#include <stdbool.h>
 
 #include <common/sessiond-comm/relayd.h>
 #include <common/sessiond-comm/sessiond-comm.h>
 #include <common/trace-chunk.h>
 
 #include <common/sessiond-comm/relayd.h>
 #include <common/sessiond-comm/sessiond-comm.h>
 #include <common/trace-chunk.h>
+#include <common/dynamic-array.h>
+
+struct relayd_stream_rotation_position {
+       uint64_t stream_id;
+       /*
+        * Sequence number of the first packet belonging to the new
+        * "destination" trace chunk to which the stream is rotating.
+        *
+        * Ignored for metadata streams.
+        */
+       uint64_t rotate_at_seq_num;
+};
 
 int relayd_connect(struct lttcomm_relayd_sock *sock);
 int relayd_close(struct lttcomm_relayd_sock *sock);
 
 int relayd_connect(struct lttcomm_relayd_sock *sock);
 int relayd_close(struct lttcomm_relayd_sock *sock);
@@ -58,11 +71,15 @@ int relayd_send_index(struct lttcomm_relayd_sock *rsock,
                uint64_t net_seq_num);
 int relayd_reset_metadata(struct lttcomm_relayd_sock *rsock,
                uint64_t stream_id, uint64_t version);
                uint64_t net_seq_num);
 int relayd_reset_metadata(struct lttcomm_relayd_sock *rsock,
                uint64_t stream_id, uint64_t version);
-int relayd_rotate_stream(struct lttcomm_relayd_sock *sock, uint64_t stream_id,
-               uint64_t new_chunk_id, uint64_t seq_num);
+/* `positions` is an array of `stream_count` relayd_stream_rotation_position. */
+int relayd_rotate_streams(struct lttcomm_relayd_sock *sock,
+               unsigned int stream_count, uint64_t *new_chunk_id,
+               const struct relayd_stream_rotation_position *positions);
 int relayd_create_trace_chunk(struct lttcomm_relayd_sock *sock,
                struct lttng_trace_chunk *chunk);
 int relayd_close_trace_chunk(struct lttcomm_relayd_sock *sock,
                struct lttng_trace_chunk *chunk);
 int relayd_create_trace_chunk(struct lttcomm_relayd_sock *sock,
                struct lttng_trace_chunk *chunk);
 int relayd_close_trace_chunk(struct lttcomm_relayd_sock *sock,
                struct lttng_trace_chunk *chunk);
+int relayd_trace_chunk_exists(struct lttcomm_relayd_sock *sock,
+               uint64_t chunk_id, bool *chunk_exists);
 
 #endif /* _RELAYD_H */
 
 #endif /* _RELAYD_H */
index 47f1d5cd738327290e91b27a91ee8b765522cde3..b22cdf4c51881b5b06289e2decdfaa1f15e887dd 100644 (file)
@@ -228,15 +228,26 @@ struct lttcomm_relayd_reset_metadata {
        uint64_t version;
 } LTTNG_PACKED;
 
        uint64_t version;
 } LTTNG_PACKED;
 
-struct lttcomm_relayd_rotate_stream {
+struct lttcomm_relayd_stream_rotation_position {
        uint64_t stream_id;
        uint64_t stream_id;
-       /* Ignored for metadata streams. */
+       /*
+        * Sequence number of the first packet belonging to the new
+        * "destination" trace chunk to which the stream is rotating.
+        *
+        * Ignored for metadata streams.
+        */
        uint64_t rotate_at_seq_num;
        uint64_t rotate_at_seq_num;
-       uint64_t new_chunk_id;
-       /* Includes trailing NULL. */
-       uint32_t pathname_length;
-       /* Must be the last member of this structure. */
-       char new_pathname[];
+} LTTNG_PACKED;
+
+struct lttcomm_relayd_rotate_streams {
+       uint32_t stream_count;
+       /*
+        * Streams can be rotated outside of a chunk but not be parented to
+        * a new chunk.
+        */
+       LTTNG_OPTIONAL_COMM(uint64_t) new_chunk_id;
+       /* `stream_count` positions follow. */
+       struct lttcomm_relayd_stream_rotation_position rotation_positions[];
 } LTTNG_PACKED;
 
 struct lttcomm_relayd_create_trace_chunk {
 } LTTNG_PACKED;
 
 struct lttcomm_relayd_create_trace_chunk {
@@ -256,4 +267,13 @@ struct lttcomm_relayd_close_trace_chunk {
        LTTNG_OPTIONAL_COMM(uint32_t) LTTNG_PACKED close_command;
 } LTTNG_PACKED;
 
        LTTNG_OPTIONAL_COMM(uint32_t) LTTNG_PACKED close_command;
 } LTTNG_PACKED;
 
+struct lttcomm_relayd_trace_chunk_exists {
+       uint64_t chunk_id;
+} LTTNG_PACKED;
+
+struct lttcomm_relayd_trace_chunk_exists_reply {
+       struct lttcomm_relayd_generic_reply generic;
+       uint8_t trace_chunk_exists;
+} LTTNG_PACKED;
+
 #endif /* _RELAYD_COMM */
 #endif /* _RELAYD_COMM */
index aeefdaaa6b95b076a804c54299bfbdf1a8425276..a1be2d0a029c72e6ddb0fb2b4abd8f66a9136e2e 100644 (file)
@@ -131,12 +131,14 @@ enum lttcomm_relayd_command {
        RELAYD_STREAMS_SENT                 = 16,
        /* Ask the relay to reset the metadata trace file (2.8+) */
        RELAYD_RESET_METADATA               = 17,
        RELAYD_STREAMS_SENT                 = 16,
        /* Ask the relay to reset the metadata trace file (2.8+) */
        RELAYD_RESET_METADATA               = 17,
-       /* Ask the relay to rotate a stream file (2.11+) */
-       RELAYD_ROTATE_STREAM                = 18,
+       /* Ask the relay to rotate a set of stream files (2.11+) */
+       RELAYD_ROTATE_STREAMS                = 18,
        /* Ask the relay to create a trace chunk (2.11+) */
        RELAYD_CREATE_TRACE_CHUNK           = 19,
        /* Ask the relay to close a trace chunk (2.11+) */
        RELAYD_CLOSE_TRACE_CHUNK            = 20,
        /* Ask the relay to create a trace chunk (2.11+) */
        RELAYD_CREATE_TRACE_CHUNK           = 19,
        /* Ask the relay to close a trace chunk (2.11+) */
        RELAYD_CLOSE_TRACE_CHUNK            = 20,
+       /* Ask the relay whether a trace chunk exists (2.11+) */
+       RELAYD_TRACE_CHUNK_EXISTS           = 21,
 };
 
 /*
 };
 
 /*
index 3f472b4353bc44aa8de15be627878640f96fca6c..172bd3df88e1d32247a93f06786f8c639de5225b 100644 (file)
@@ -827,7 +827,7 @@ void lttng_trace_chunk_move_to_completed(struct lttng_trace_chunk *trace_chunk)
                        LTTNG_OPTIONAL_GET(trace_chunk->timestamp_creation);
        const time_t close_timestamp =
                        LTTNG_OPTIONAL_GET(trace_chunk->timestamp_close);
                        LTTNG_OPTIONAL_GET(trace_chunk->timestamp_creation);
        const time_t close_timestamp =
                        LTTNG_OPTIONAL_GET(trace_chunk->timestamp_close);
-       LTTNG_OPTIONAL(struct lttng_directory_handle) archived_chunks_directory;
+       LTTNG_OPTIONAL(struct lttng_directory_handle) archived_chunks_directory = {};
 
        if (!trace_chunk->mode.is_set ||
                        trace_chunk->mode.value != TRACE_CHUNK_MODE_OWNER ||
 
        if (!trace_chunk->mode.is_set ||
                        trace_chunk->mode.value != TRACE_CHUNK_MODE_OWNER ||
index 732a30bf114fb89551bf322451dca89afbef10cf..664ba3e8e82abbdcf3a0c15614ba9b5858fbea93 100644 (file)
@@ -833,74 +833,6 @@ error:
        return ret;
 }
 
        return ret;
 }
 
-/*
- * Change the output tracefile according to the given size and count The
- * new_count pointer is set during this operation.
- *
- * From the consumer, the stream lock MUST be held before calling this function
- * because we are modifying the stream status.
- *
- * Return 0 on success or else a negative value.
- */
-LTTNG_HIDDEN
-int utils_rotate_stream_file(char *path_name, char *file_name, uint64_t size,
-               uint64_t count, int uid, int gid, int out_fd, uint64_t *new_count,
-               int *stream_fd)
-{
-       int ret;
-
-       assert(stream_fd);
-
-       ret = close(out_fd);
-       if (ret < 0) {
-               PERROR("Closing tracefile");
-               goto error;
-       }
-       *stream_fd = -1;
-
-       if (count > 0) {
-               /*
-                * In tracefile rotation, for the relay daemon we need
-                * to unlink the old file if present, because it may
-                * still be open in reading by the live thread, and we
-                * need to ensure that we do not overwrite the content
-                * between get_index and get_packet. Since we have no
-                * way to verify integrity of the data content compared
-                * to the associated index, we need to ensure the reader
-                * has exclusive access to the file content, and that
-                * the open of the data file is performed in get_index.
-                * Unlinking the old file rather than overwriting it
-                * achieves this.
-                */
-               if (new_count) {
-                       *new_count = (*new_count + 1) % count;
-               }
-               ret = utils_unlink_stream_file(path_name, file_name, size,
-                               new_count ? *new_count : 0, uid, gid, 0);
-               if (ret < 0 && errno != ENOENT) {
-                       goto error;
-               }
-       } else {
-               if (new_count) {
-                       (*new_count)++;
-               }
-       }
-
-       ret = utils_create_stream_file(path_name, file_name, size,
-                       new_count ? *new_count : 0, uid, gid, 0);
-       if (ret < 0) {
-               goto error;
-       }
-       *stream_fd = ret;
-
-       /* Success. */
-       ret = 0;
-
-error:
-       return ret;
-}
-
-
 /**
  * Parse a string that represents a size in human readable format. It
  * supports decimal integers suffixed by 'k', 'K', 'M' or 'G'.
 /**
  * Parse a string that represents a size in human readable format. It
  * supports decimal integers suffixed by 'k', 'K', 'M' or 'G'.
index ef02c275914bbd25ecec7cb350c8af05e19b5fff..57c7307bec3aaf37f3dc9af98946298e7339b02c 100644 (file)
@@ -48,9 +48,6 @@ int utils_create_stream_file(const char *path_name, char *file_name, uint64_t si
                uint64_t count, int uid, int gid, char *suffix);
 int utils_unlink_stream_file(const char *path_name, char *file_name, uint64_t size,
                uint64_t count, int uid, int gid, char *suffix);
                uint64_t count, int uid, int gid, char *suffix);
 int utils_unlink_stream_file(const char *path_name, char *file_name, uint64_t size,
                uint64_t count, int uid, int gid, char *suffix);
-int utils_rotate_stream_file(char *path_name, char *file_name, uint64_t size,
-               uint64_t count, int uid, int gid, int out_fd, uint64_t *new_count,
-               int *stream_fd);
 int utils_stream_file_path(const char *path_name, const char *file_name,
                uint64_t size, uint64_t count, const char *suffix,
                char *out_stream_path, size_t stream_path_len);
 int utils_stream_file_path(const char *path_name, const char *file_name,
                uint64_t size, uint64_t count, const char *suffix,
                char *out_stream_path, size_t stream_path_len);
This page took 0.124873 seconds and 5 git commands to generate.