Implement the RELAYD_ROTATE_STREAM relay daemon command
authorJulien Desfossez <jdesfossez@efficios.com>
Thu, 14 Dec 2017 15:05:19 +0000 (10:05 -0500)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Tue, 13 Mar 2018 19:55:50 +0000 (15:55 -0400)
Support for the RELAYD_ROTATE_STREAM command on the relay. This
command informs the relay that the current stream must rotate after it
has written the data and index for the net_seq_num passed. After each
data and index written on disk we check if it is time to rotate (in
case it was in flight when the rotate command was received). On the
other hand, if too much data has been written when we receive the
rotate command, we move the excess data to a new tracefile and
truncate the current one.

Signed-off-by: Julien Desfossez <jdesfossez@efficios.com>
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
src/bin/lttng-relayd/index.c
src/bin/lttng-relayd/index.h
src/bin/lttng-relayd/main.c
src/bin/lttng-relayd/stream.c
src/bin/lttng-relayd/stream.h
src/common/sessiond-comm/relayd.h
src/common/sessiond-comm/sessiond-comm.h
src/common/utils.c

index 1b14e3e078e1d96fb0d88f93301fac8e0c3e2d13..92d4581d124a95921694129cba29213c08a7bd90 100644 (file)
@@ -22,6 +22,7 @@
 
 #include <common/common.h>
 #include <common/utils.h>
+#include <common/compat/endian.h>
 
 #include "lttng-relayd.h"
 #include "stream.h"
@@ -354,3 +355,59 @@ uint64_t relay_index_find_last(struct relay_stream *stream)
        rcu_read_unlock();
        return net_seq_num;
 }
+
+/*
+ * Update the index file of an already existing relay_index.
+ * Offsets by 'removed_data_count' the offset field of an index.
+ */
+static
+int relay_index_switch_file(struct relay_index *index,
+               struct lttng_index_file *new_index_file,
+               uint64_t removed_data_count)
+{
+       int ret = 0;
+       uint64_t offset;
+
+       pthread_mutex_lock(&index->lock);
+       if (!index->index_file) {
+               ERR("No index_file");
+               ret = 0;
+               goto end;
+       }
+
+       lttng_index_file_put(index->index_file);
+       lttng_index_file_get(new_index_file);
+       index->index_file = new_index_file;
+       offset = be64toh(index->index_data.offset);
+       index->index_data.offset = htobe64(offset - removed_data_count);
+
+end:
+       pthread_mutex_unlock(&index->lock);
+       return ret;
+}
+
+/*
+ * Switch the index file of all pending indexes for a stream and update the
+ * data offset by substracting the last safe position.
+ * Stream lock must be held.
+ */
+int relay_index_switch_all_files(struct relay_stream *stream)
+{
+       struct lttng_ht_iter iter;
+       struct relay_index *index;
+       int ret = 0;
+
+       rcu_read_lock();
+       cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
+                       index, index_n.node) {
+               DBG("Update index to fd %d", stream->index_file->fd);
+               ret = relay_index_switch_file(index, stream->index_file,
+                               stream->pos_after_last_complete_data_index);
+               if (ret) {
+                       goto end;
+               }
+       }
+end:
+       rcu_read_unlock();
+       return ret;
+}
index dda5b910b5b8c8df0b88dcd1bda2fd50f9be8c66..08388c001872a221cbdd6f42960513619ca42759 100644 (file)
@@ -46,6 +46,8 @@ struct relay_index {
 
        /* Index packet data. This is the data that is written on disk. */
        struct ctf_packet_index index_data;
+       /* Data + padding size of this packet, filled by the data thread. */
+       uint64_t total_size;
 
        bool has_index_data;
        bool flushed;
@@ -73,5 +75,6 @@ int relay_index_try_flush(struct relay_index *index);
 void relay_index_close_all(struct relay_stream *stream);
 void relay_index_close_partial_fd(struct relay_stream *stream);
 uint64_t relay_index_find_last(struct relay_stream *stream);
+int relay_index_switch_all_files(struct relay_stream *stream);
 
 #endif /* _RELAY_INDEX_H */
index 4c99b56f67dcb23ee726b361e96d81d6ba0f0967..e8283f037e43a3a9429b57ec0e606a44097192f2 100644 (file)
@@ -54,6 +54,7 @@
 #include <common/sessiond-comm/relayd.h>
 #include <common/uri.h>
 #include <common/utils.h>
+#include <common/align.h>
 #include <common/config/session-config.h>
 #include <urcu/rculist.h>
 
@@ -93,6 +94,7 @@ static int lttng_relay_ready = NR_LTTNG_RELAY_READY;
 
 /* Size of receive buffer. */
 #define RECV_DATA_BUFFER_SIZE          65536
+#define FILE_COPY_BUFFER_SIZE          65536
 
 static int recv_child_signal;  /* Set to 1 when a SIGUSR1 signal is received. */
 static pid_t child_ppid;       /* Internal parent PID use with daemonize. */
@@ -1503,6 +1505,247 @@ end:
        return ret;
 }
 
+/*
+ * Close the current index file if it is open, and create a new one.
+ *
+ * Return 0 on success, -1 on error.
+ */
+static
+int create_rotate_index_file(struct relay_stream *stream)
+{
+       int ret;
+       uint32_t major, minor;
+
+       /* Put ref on previous index_file. */
+       if (stream->index_file) {
+               lttng_index_file_put(stream->index_file);
+               stream->index_file = NULL;
+       }
+       major = stream->trace->session->major;
+       minor = stream->trace->session->minor;
+       stream->index_file = lttng_index_file_create(stream->path_name,
+                       stream->channel_name,
+                       -1, -1, stream->tracefile_size,
+                       tracefile_array_get_file_index_head(stream->tfa),
+                       lttng_to_index_major(major, minor),
+                       lttng_to_index_minor(major, minor));
+       if (!stream->index_file) {
+               ret = -1;
+               goto end;
+       }
+
+       ret = 0;
+
+end:
+       return ret;
+}
+
+static
+int do_rotate_stream(struct relay_stream *stream)
+{
+       int ret;
+
+       /* Perform the stream rotation. */
+       ret = utils_rotate_stream_file(stream->path_name,
+                       stream->channel_name, stream->tracefile_size,
+                       stream->tracefile_count, -1,
+                       -1, stream->stream_fd->fd,
+                       NULL, &stream->stream_fd->fd);
+       if (ret < 0) {
+               ERR("Rotating stream output file");
+               goto end;
+       }
+       stream->tracefile_size_current = 0;
+
+       /* Rotate also the index if the stream is not a metadata stream. */
+       if (!stream->is_metadata) {
+               ret = create_rotate_index_file(stream);
+               if (ret < 0) {
+                       ERR("Failed to rotate index file");
+                       goto end;
+               }
+       }
+
+       stream->rotate_at_seq_num = -1ULL;
+       stream->pos_after_last_complete_data_index = 0;
+
+end:
+       return ret;
+}
+
+/*
+ * If too much data has been written in a tracefile before we received the
+ * rotation command, we have to move the excess data to the new tracefile and
+ * perform the rotation. This can happen because the control and data
+ * connections are separate, the indexes as well as the commands arrive from
+ * the control connection and we have no control over the order so we could be
+ * in a situation where too much data has been received on the data connection
+ * before the rotation command on the control connection arrives. We don't need
+ * to update the index because its order is guaranteed with the rotation
+ * command message.
+ */
+static
+int rotate_truncate_stream(struct relay_stream *stream)
+{
+       int ret, new_fd;
+       uint64_t diff, pos = 0;
+       char buf[FILE_COPY_BUFFER_SIZE];
+
+       assert(!stream->is_metadata);
+
+       assert(stream->tracefile_size_current >
+                       stream->pos_after_last_complete_data_index);
+       diff = stream->tracefile_size_current -
+                       stream->pos_after_last_complete_data_index;
+
+       /* Create the new tracefile. */
+       new_fd = utils_create_stream_file(stream->path_name,
+                       stream->channel_name,
+                       stream->tracefile_size, stream->tracefile_count,
+                       /* uid */ -1, /* gid */ -1, /* suffix */ NULL);
+       if (new_fd < 0) {
+               ERR("Failed to create new stream file at path %s for channel %s",
+                               stream->path_name, stream->channel_name);
+               ret = -1;
+               goto end;
+       }
+
+       /*
+        * Rewind the current tracefile to the position at which the rotation
+        * should have occured.
+        */
+       ret = lseek(stream->stream_fd->fd,
+                       stream->pos_after_last_complete_data_index, SEEK_SET);
+       if (ret < 0) {
+               PERROR("seek truncate stream");
+               goto end;
+       }
+
+       /* Move data from the old file to the new file. */
+       while (pos < diff) {
+               uint64_t count, bytes_left;
+               ssize_t io_ret;
+
+               bytes_left = diff - pos;
+               count = bytes_left > sizeof(buf) ? sizeof(buf) : bytes_left;
+               assert(count <= SIZE_MAX);
+
+               io_ret = lttng_read(stream->stream_fd->fd, buf, count);
+               if (io_ret < (ssize_t) count) {
+                       char error_string[256];
+
+                       snprintf(error_string, sizeof(error_string),
+                                       "Failed to read %" PRIu64 " bytes from fd %i in rotate_truncate_stream(), returned %zi",
+                                       count, stream->stream_fd->fd, io_ret);
+                       if (io_ret == -1) {
+                               PERROR("%s", error_string);
+                       } else {
+                               ERR("%s", error_string);
+                       }
+                       ret = -1;
+                       goto end;
+               }
+
+               io_ret = lttng_write(new_fd, buf, count);
+               if (io_ret < (ssize_t) count) {
+                       char error_string[256];
+
+                       snprintf(error_string, sizeof(error_string),
+                                       "Failed to write %" PRIu64 " bytes from fd %i in rotate_truncate_stream(), returned %zi",
+                                       count, new_fd, io_ret);
+                       if (io_ret == -1) {
+                               PERROR("%s", error_string);
+                       } else {
+                               ERR("%s", error_string);
+                       }
+                       ret = -1;
+                       goto end;
+               }
+
+               pos += count;
+       }
+
+       /* Truncate the file to get rid of the excess data. */
+       ret = ftruncate(stream->stream_fd->fd,
+                       stream->pos_after_last_complete_data_index);
+       if (ret) {
+               PERROR("ftruncate");
+               goto end;
+       }
+
+       ret = close(stream->stream_fd->fd);
+       if (ret < 0) {
+               PERROR("Closing tracefile");
+               goto end;
+       }
+
+       ret = create_rotate_index_file(stream);
+       if (ret < 0) {
+               ERR("Rotate stream index file");
+               goto end;
+       }
+
+       /*
+        * Update the offset and FD of all the eventual indexes created by the
+        * data connection before the rotation command arrived.
+        */
+       ret = relay_index_switch_all_files(stream);
+       if (ret < 0) {
+               ERR("Failed to rotate index file");
+               goto end;
+       }
+
+       stream->stream_fd->fd = new_fd;
+       stream->tracefile_size_current = diff;
+       stream->pos_after_last_complete_data_index = 0;
+       stream->rotate_at_seq_num = -1ULL;
+
+       ret = 0;
+
+end:
+       return ret;
+}
+
+/*
+ * Check if a stream should perform a rotation (for session rotation).
+ * Must be called with the stream lock held.
+ *
+ * Return 0 on success, a negative value on error.
+ */
+static
+int try_rotate_stream(struct relay_stream *stream)
+{
+       int ret = 0;
+
+       /* No rotation expected. */
+       if (stream->rotate_at_seq_num == -1ULL) {
+               goto end;
+       }
+
+       if (stream->prev_seq < stream->rotate_at_seq_num) {
+               DBG("Stream %" PRIu64 " no yet ready for rotation",
+                               stream->stream_handle);
+               goto end;
+       } else if (stream->prev_seq > stream->rotate_at_seq_num) {
+               DBG("Rotation after too much data has been written in tracefile "
+                               "for stream %" PRIu64 ", need to truncate before "
+                               "rotating", stream->stream_handle);
+               ret = rotate_truncate_stream(stream);
+               if (ret) {
+                       ERR("Failed to truncate stream");
+                       goto end;
+               }
+       } else {
+               /* stream->prev_seq == stream->rotate_at_seq_num */
+               DBG("Stream %" PRIu64 " ready for rotation",
+                               stream->stream_handle);
+               ret = do_rotate_stream(stream);
+       }
+
+end:
+       return ret;
+}
+
 /*
  * relay_recv_metadata: receive the metadata for the session.
  */
@@ -1586,6 +1829,11 @@ static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr,
        DBG2("Relay metadata written. Updated metadata_received %" PRIu64,
                metadata_stream->metadata_received);
 
+       ret = try_rotate_stream(metadata_stream);
+       if (ret < 0) {
+               goto end_put;
+       }
+
 end_put:
        pthread_mutex_unlock(&metadata_stream->lock);
        stream_put(metadata_stream);
@@ -2049,6 +2297,7 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
        if (ret == 0) {
                tracefile_array_commit_seq(stream->tfa);
                stream->index_received_seqcount++;
+               stream->pos_after_last_complete_data_index += index->total_size;
        } else if (ret > 0) {
                /* no flush. */
                ret = 0;
@@ -2122,6 +2371,145 @@ end_no_session:
        return ret;
 }
 
+/*
+ * relay_rotate_stream: rotate a stream to a new tracefile for the session
+ * rotation feature (not the tracefile rotation feature).
+ */
+static int relay_rotate_session_stream(struct lttcomm_relayd_hdr *recv_hdr,
+               struct relay_connection *conn)
+{
+       int ret, send_ret;
+       struct relay_session *session = conn->session;
+       struct lttcomm_relayd_rotate_stream stream_info;
+       struct lttcomm_relayd_generic_reply reply;
+       struct relay_stream *stream;
+       size_t len;
+       char *new_pathname = NULL;
+
+       DBG("Rotate stream received");
+
+       if (!session || !conn->version_check_done) {
+               ERR("Trying to rotate a stream before version check");
+               ret = -1;
+               goto end_no_reply;
+       }
+
+       if (session->major == 2 && session->minor < 11) {
+               ERR("Unsupported feature before 2.11");
+               ret = -1;
+               goto end_no_reply;
+       }
+
+       memset(&stream_info, 0, sizeof(struct lttcomm_relayd_rotate_stream));
+
+       /*
+        * Receive the struct up to the new_pathname member since we don't know
+        * its size yet.
+        */
+       ret = conn->sock->ops->recvmsg(conn->sock, &stream_info,
+                       sizeof(struct lttcomm_relayd_rotate_stream), 0);
+       if (ret < sizeof(struct lttcomm_relayd_rotate_stream)) {
+               if (ret == 0) {
+                       /* Orderly shutdown. Not necessary to print an error. */
+                       DBG("Socket %d did an orderly shutdown", conn->sock->fd);
+               } else {
+                       ERR("Relay didn't receive valid rotate_stream struct size : %d", ret);
+               }
+               ret = -1;
+               goto end_no_reply;
+       }
+
+       stream = stream_get_by_id(be64toh(stream_info.stream_id));
+       if (!stream) {
+               ret = -1;
+               goto end;
+       }
+
+       len = be32toh(stream_info.pathname_length);
+       /* Ensure it fits in local filename length. */
+       if (len >= LTTNG_PATH_MAX) {
+               ret = -ENAMETOOLONG;
+               ERR("Length of relay_rotate_session_stream command's path name (%zu bytes) exceeds the maximal allowed length of %i bytes",
+                               len, LTTNG_PATH_MAX);
+               goto end;
+       }
+
+       new_pathname = zmalloc(len);
+       if (!new_pathname) {
+               PERROR("Failed to allocation new path name of relay_rotate_session_stream command");
+               ret = -1;
+               goto end;
+       }
+
+       ret = conn->sock->ops->recvmsg(conn->sock, new_pathname, len, 0);
+       if (ret < len) {
+               if (ret == 0) {
+                       /* Orderly shutdown. Not necessary to print an error. */
+                       DBG("Socket %d did an orderly shutdown", conn->sock->fd);
+               } else {
+                       ERR("Relay didn't receive valid rotate_stream struct size : %d", ret);
+               }
+               ret = -1;
+               goto end_no_reply;
+       }
+
+       pthread_mutex_lock(&stream->lock);
+
+       /*
+        * Update the trace path (just the folder, the stream name does not
+        * change).
+        */
+       free(stream->path_name);
+       stream->path_name = create_output_path(new_pathname);
+       if (!stream->path_name) {
+               ERR("Failed to create a new output path");
+               goto end_stream_unlock;
+       }
+       ret = utils_mkdir_recursive(stream->path_name, S_IRWXU | S_IRWXG,
+                       -1, -1);
+       if (ret < 0) {
+               ERR("relay creating output directory");
+               goto end_stream_unlock;
+       }
+       stream->chunk_id = be64toh(stream_info.new_chunk_id);
+
+       if (stream->is_metadata) {
+               /*
+                * The metadata stream is sent only over the control connection
+                * so we know we have all the data to perform the stream
+                * rotation.
+                */
+               ret = do_rotate_stream(stream);
+       } else {
+               stream->rotate_at_seq_num = be64toh(stream_info.rotate_at_seq_num);
+               ret = try_rotate_stream(stream);
+       }
+       if (ret < 0) {
+               goto end_stream_unlock;
+       }
+
+end_stream_unlock:
+       pthread_mutex_unlock(&stream->lock);
+       stream_put(stream);
+end:
+       memset(&reply, 0, sizeof(reply));
+       if (ret < 0) {
+               reply.ret_code = htobe32(LTTNG_ERR_UNK);
+       } else {
+               reply.ret_code = htobe32(LTTNG_OK);
+       }
+       send_ret = conn->sock->ops->sendmsg(conn->sock, &reply,
+                       sizeof(struct lttcomm_relayd_generic_reply), 0);
+       if (send_ret < 0) {
+               ERR("Failed to send reply of rotate session stream command");
+               ret = send_ret;
+       }
+
+end_no_reply:
+       free(new_pathname);
+       return ret;
+}
+
 /*
  * relay_mkdir: Create a folder on the disk.
  */
@@ -2449,6 +2837,9 @@ static int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr,
        case RELAYD_RESET_METADATA:
                ret = relay_reset_metadata(recv_hdr, conn);
                break;
+       case RELAYD_ROTATE_STREAM:
+               ret = relay_rotate_session_stream(recv_hdr, conn);
+               break;
        case RELAYD_ROTATE_RENAME:
                ret = relay_rotate_rename(recv_hdr, conn);
                break;
@@ -2475,7 +2866,7 @@ end:
  * Return 0 on success else a negative value.
  */
 static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
-               int rotate_index)
+               int rotate_index, bool *flushed, uint64_t total_size)
 {
        int ret = 0;
        uint64_t data_offset;
@@ -2499,23 +2890,9 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
        }
 
        if (rotate_index || !stream->index_file) {
-               uint32_t major, minor;
-
-               /* Put ref on previous index_file. */
-               if (stream->index_file) {
-                       lttng_index_file_put(stream->index_file);
-                       stream->index_file = NULL;
-               }
-               major = stream->trace->session->major;
-               minor = stream->trace->session->minor;
-               stream->index_file = lttng_index_file_create(stream->path_name,
-                               stream->channel_name,
-                               -1, -1, stream->tracefile_size,
-                               tracefile_array_get_file_index_head(stream->tfa),
-                               lttng_to_index_major(major, minor),
-                               lttng_to_index_minor(major, minor));
-               if (!stream->index_file) {
-                       ret = -1;
+               ret = create_rotate_index_file(stream);
+               if (ret < 0) {
+                       ERR("Failed to rotate index");
                        /* Put self-ref for this index due to error. */
                        relay_index_put(index);
                        index = NULL;
@@ -2535,7 +2912,9 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
        if (ret == 0) {
                tracefile_array_commit_seq(stream->tfa);
                stream->index_received_seqcount++;
+               *flushed = true;
        } else if (ret > 0) {
+               index->total_size = total_size;
                /* No flush. */
                ret = 0;
        } else {
@@ -2565,6 +2944,7 @@ static int relay_process_data(struct relay_connection *conn)
        size_t chunk_size = RECV_DATA_BUFFER_SIZE;
        size_t recv_off = 0;
        char data_buffer[chunk_size];
+       bool index_flushed = false;
 
        ret = conn->sock->ops->recvmsg(conn->sock, &data_hdr,
                        sizeof(struct lttcomm_relayd_data_hdr), 0);
@@ -2630,7 +3010,9 @@ static int relay_process_data(struct relay_connection *conn)
         * snapshot and index are NOT supported.
         */
        if (session->minor >= 4 && !session->snapshot) {
-               ret = handle_index_data(stream, net_seq_num, rotate_index);
+               ret = handle_index_data(stream, net_seq_num, rotate_index,
+                               &index_flushed,
+                               data_size + be32toh(data_hdr.padding_size));
                if (ret < 0) {
                        ERR("handle_index_data: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
                                        stream->stream_handle, net_seq_num, ret);
@@ -2678,9 +3060,18 @@ static int relay_process_data(struct relay_connection *conn)
        if (stream->prev_seq == -1ULL) {
                new_stream = true;
        }
+       if (index_flushed) {
+               stream->pos_after_last_complete_data_index =
+                               stream->tracefile_size_current;
+       }
 
        stream->prev_seq = net_seq_num;
 
+       ret = try_rotate_stream(stream);
+       if (ret < 0) {
+               goto end_stream_unlock;
+       }
+
 end_stream_unlock:
        close_requested = stream->close_requested;
        pthread_mutex_unlock(&stream->lock);
index 2bc815813613054f58f39810b9c8df0dcade0117..5ed37c58ec1d2c0956b550ac837bac314d1e33a0 100644 (file)
@@ -88,6 +88,7 @@ struct relay_stream *stream_create(struct ctf_trace *trace,
        stream->tracefile_count = tracefile_count;
        stream->path_name = path_name;
        stream->channel_name = channel_name;
+       stream->rotate_at_seq_num = -1ULL;
        lttng_ht_node_init_u64(&stream->node, stream->stream_handle);
        pthread_mutex_init(&stream->lock, NULL);
        urcu_ref_init(&stream->ref);
index e385032cb71f98d4994dfad241e915f61e0cb28a..47ae2e8f9927fdcd672a595bf3c20902fd375846 100644 (file)
@@ -63,6 +63,11 @@ struct relay_stream {
        uint64_t tracefile_size_current;
        uint64_t tracefile_count;
 
+       /*
+        * Position in the tracefile where we have the full index also on disk.
+        */
+       uint64_t pos_after_last_complete_data_index;
+
        /*
         * Counts the number of received indexes. The "tag" associated
         * with an index is taken before incrementing this seqcount.
@@ -129,6 +134,24 @@ struct relay_stream {
        struct lttng_ht_node_u64 node;
        bool in_stream_ht;              /* is stream in stream hash table. */
        struct rcu_head rcu_node;       /* For call_rcu teardown. */
+       /*
+        * When we have written the data and index corresponding to this
+        * seq_num, rotate the tracefile (session rotation). The path_name is
+        * already up-to-date.
+        * This is set to -1ULL when no rotation is pending.
+        *
+        * Always access with stream lock held.
+        */
+       uint64_t rotate_at_seq_num;
+       /*
+        * This is the id of the chunk where we are writing to if no rotation is
+        * pending (rotate_at_seq_num == -1ULL). If a rotation is pending, this
+        * is the chunk_id we will have after the rotation. It must be updated
+        * atomically with rotate_at_seq_num.
+        *
+        * Always access with stream lock held.
+        */
+       uint64_t chunk_id;
 };
 
 struct relay_stream *stream_create(struct ctf_trace *trace,
index 52482e017a4b58ce527c1bc8275ad568f0d9d108..444c43af0aae454b32ecb8b86aedf33ed681f1b2 100644 (file)
@@ -195,6 +195,16 @@ struct lttcomm_relayd_reset_metadata {
        uint64_t version;
 } LTTNG_PACKED;
 
+struct lttcomm_relayd_rotate_stream {
+       uint64_t stream_id;
+       uint64_t rotate_at_seq_num;
+       uint64_t new_chunk_id;
+       /* Includes trailing NULL. */
+       uint32_t pathname_length;
+       /* Must be the last member of this structure. */
+       char new_pathname[];
+} LTTNG_PACKED;
+
 struct lttcomm_relayd_rotate_rename {
        uint32_t old_path_length;
        uint32_t new_path_length;
index 6f8249b6ab0801450a81196fce2d8acca239e196..78c7f5e2b6b9a1fc3cbf8a5fc55c15f1bc2d87d9 100644 (file)
@@ -123,6 +123,8 @@ enum lttcomm_relayd_command {
        RELAYD_STREAMS_SENT                 = 16,
        /* Ask the relay to reset the metadata trace file (2.8+) */
        RELAYD_RESET_METADATA               = 17,
+       /* Ask the relay to rotate a stream file (2.11+) */
+       RELAYD_ROTATE_STREAM                = 18,
        /* Rename a chunk after the rotation is completed (2.11+) */
        RELAYD_ROTATE_RENAME                = 19,
        /* Create a folder on the relayd FS (2.11+) */
index 182b6eed7c700cd53579a4bac3cf74a0b1d42f8b..f1e0148cb9cb9cd28159846141a99fc108b95a58 100644 (file)
@@ -785,7 +785,11 @@ int utils_create_stream_file(const char *path_name, char *file_name, uint64_t si
                goto error;
        }
 
-       flags = O_WRONLY | O_CREAT | O_TRUNC;
+       /*
+        * With the session rotation feature on the relay, we might need to seek
+        * and truncate a tracefile, so we need read and write access.
+        */
+       flags = O_RDWR | O_CREAT | O_TRUNC;
        /* Open with 660 mode */
        mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
 
This page took 0.036446 seconds and 5 git commands to generate.