Backport: relayd: use the fd-tracker to track stream_fd fds
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Fri, 29 Jun 2018 21:48:58 +0000 (17:48 -0400)
committerJonathan Rajotte <jonathan.rajotte-julien@efficios.com>
Fri, 21 Sep 2018 04:00:52 +0000 (00:00 -0400)
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
src/bin/lttng-relayd/live.c
src/bin/lttng-relayd/main.c
src/bin/lttng-relayd/stream-fd.c
src/bin/lttng-relayd/stream-fd.h
src/bin/lttng-relayd/stream.c
src/common/fd-tracker/fd-tracker.c
src/common/utils.c
src/common/utils.h

index 38f19f0c289f8a224541ef9cf04043c1078bd326..03b4fa1c7e1aa12a24b2f940520e5ba8eb6718a4 100644 (file)
@@ -1455,12 +1455,8 @@ int viewer_get_next_index(struct relay_connection *conn)
                if (ret < 0) {
                        goto error_put;
                }
-               ret = open(fullpath, O_RDONLY);
-               if (ret < 0) {
-                       PERROR("Relay opening trace file");
-                       goto error_put;
-               }
-               vstream->stream_fd = stream_fd_create(ret);
+
+               vstream->stream_fd = stream_fd_open(fullpath);
                if (!vstream->stream_fd) {
                        if (close(ret)) {
                                PERROR("close");
@@ -1561,7 +1557,7 @@ error_put:
 static
 int viewer_get_packet(struct relay_connection *conn)
 {
-       int ret;
+       int ret, stream_fd;
        off_t lseek_ret;
        char *reply = NULL;
        struct lttng_viewer_get_packet get_packet_info;
@@ -1604,26 +1600,32 @@ int viewer_get_packet(struct relay_connection *conn)
        }
 
        pthread_mutex_lock(&vstream->stream->lock);
-       lseek_ret = lseek(vstream->stream_fd->fd, be64toh(get_packet_info.offset),
-                       SEEK_SET);
+       stream_fd = stream_fd_get_fd(vstream->stream_fd);
+       if (stream_fd < 0) {
+               ERR("Failed to get viewer stream file descriptor");
+               goto error_put_fd;
+       }
+       lseek_ret = lseek(stream_fd, be64toh(get_packet_info.offset), SEEK_SET);
        if (lseek_ret < 0) {
-               PERROR("lseek fd %d to offset %" PRIu64, vstream->stream_fd->fd,
+               PERROR("lseek fd %d to offset %" PRIu64, stream_fd,
                        (uint64_t) be64toh(get_packet_info.offset));
-               goto error;
+               goto error_put_fd;
        }
-       read_len = lttng_read(vstream->stream_fd->fd,
+       read_len = lttng_read(stream_fd,
                        reply + sizeof(reply_header),
                        packet_data_len);
        if (read_len < packet_data_len) {
                PERROR("Relay reading trace file, fd: %d, offset: %" PRIu64,
-                               vstream->stream_fd->fd,
+                               stream_fd,
                                (uint64_t) be64toh(get_packet_info.offset));
-               goto error;
+               goto error_put_fd;
        }
        reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_OK);
        reply_header.len = htobe32(packet_data_len);
        goto send_reply;
 
+error_put_fd:
+       stream_fd_put_fd(vstream->stream_fd);
 error:
        reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
 
@@ -1670,7 +1672,7 @@ end:
 static
 int viewer_get_metadata(struct relay_connection *conn)
 {
-       int ret = 0;
+       int ret = 0, stream_fd = -1;
        ssize_t read_len;
        uint64_t len = 0;
        char *data = NULL;
@@ -1730,16 +1732,9 @@ int viewer_get_metadata(struct relay_connection *conn)
                if (ret < 0) {
                        goto error;
                }
-               ret = open(fullpath, O_RDONLY);
-               if (ret < 0) {
-                       PERROR("Relay opening metadata file");
-                       goto error;
-               }
-               vstream->stream_fd = stream_fd_create(ret);
+               vstream->stream_fd = stream_fd_open(fullpath);
                if (!vstream->stream_fd) {
-                       if (close(ret)) {
-                               PERROR("close");
-                       }
+                       PERROR("Failed to open viewer stream file at %s", fullpath);
                        goto error;
                }
        }
@@ -1751,10 +1746,14 @@ int viewer_get_metadata(struct relay_connection *conn)
                goto error;
        }
 
-       read_len = lttng_read(vstream->stream_fd->fd, data, len);
+       stream_fd = stream_fd_get_fd(vstream->stream_fd);
+       if (stream_fd < 0) {
+               goto error_put_fd;
+       }
+       read_len = lttng_read(stream_fd, data, len);
        if (read_len < len) {
                PERROR("Relay reading metadata file");
-               goto error;
+               goto error_put_fd;
        }
        vstream->metadata_sent += read_len;
        if (vstream->metadata_sent == vstream->stream->metadata_received
@@ -1766,7 +1765,8 @@ int viewer_get_metadata(struct relay_connection *conn)
        reply.status = htobe32(LTTNG_VIEWER_METADATA_OK);
 
        goto send_reply;
-
+error_put_fd:
+       (void) stream_fd_put_fd(vstream->stream_fd);
 error:
        reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR);
 
index 123731e171c662cef4e69f4bacedbd42681be508..0f584487a09819b58de9602718bb9a197ffc59ef 100644 (file)
@@ -1593,9 +1593,8 @@ int relay_reset_metadata(const struct lttcomm_relayd_hdr *recv_hdr,
                goto end_unlock;
        }
 
-       ret = utils_rotate_stream_file(stream->path_name, stream->channel_name,
-                       0, 0, -1, -1, stream->stream_fd->fd, NULL,
-                       &stream->stream_fd->fd);
+       ret = stream_fd_rotate(stream->stream_fd,
+                       stream->path_name, stream->channel_name, 0, 0, NULL);
        if (ret < 0) {
                ERR("Failed to rotate metadata file %s of channel %s",
                                stream->path_name, stream->channel_name);
@@ -1715,6 +1714,7 @@ static int relay_recv_metadata(const struct lttcomm_relayd_hdr *recv_hdr,
        struct lttcomm_relayd_metadata_payload metadata_payload_header;
        struct relay_stream *metadata_stream;
        uint64_t metadata_payload_size;
+       int metadata_fd = -1;
 
        if (!session) {
                ERR("Metadata sent before version check");
@@ -1745,20 +1745,24 @@ static int relay_recv_metadata(const struct lttcomm_relayd_hdr *recv_hdr,
 
        pthread_mutex_lock(&metadata_stream->lock);
 
-       size_ret = lttng_write(metadata_stream->stream_fd->fd,
+       metadata_fd = stream_fd_get_fd(metadata_stream->stream_fd);
+       if (metadata_fd < 0) {
+               goto end_put;
+       }
+       size_ret = lttng_write(metadata_fd,
                        payload->data + sizeof(metadata_payload_header),
                        metadata_payload_size);
        if (size_ret < metadata_payload_size) {
                ERR("Relay error writing metadata on file");
                ret = -1;
-               goto end_put;
+               goto end_put_fd;
        }
 
-       size_ret = write_padding_to_file(metadata_stream->stream_fd->fd,
+       size_ret = write_padding_to_file(metadata_fd,
                        metadata_payload_header.padding_size);
        if (size_ret < (int64_t) metadata_payload_header.padding_size) {
                ret = -1;
-               goto end_put;
+               goto end_put_fd;
        }
 
        metadata_stream->metadata_received +=
@@ -1766,6 +1770,8 @@ static int relay_recv_metadata(const struct lttcomm_relayd_hdr *recv_hdr,
        DBG2("Relay metadata written. Updated metadata_received %" PRIu64,
                metadata_stream->metadata_received);
 
+end_put_fd:
+       stream_fd_put_fd(metadata_stream->stream_fd);
 end_put:
        pthread_mutex_unlock(&metadata_stream->lock);
        stream_put(metadata_stream);
@@ -2746,11 +2752,9 @@ static enum relay_connection_status relay_process_data_receive_header(
                /* new_id is updated by utils_rotate_stream_file. */
                new_id = old_id;
 
-               ret = utils_rotate_stream_file(stream->path_name,
+               ret = stream_fd_rotate(stream->stream_fd, stream->path_name,
                                stream->channel_name, stream->tracefile_size,
-                               stream->tracefile_count, -1,
-                               -1, stream->stream_fd->fd,
-                               &new_id, &stream->stream_fd->fd);
+                               stream->tracefile_count, &new_id);
                if (ret < 0) {
                        ERR("Failed to rotate stream output file");
                        status = RELAY_CONNECTION_STATUS_ERROR;
@@ -2787,6 +2791,7 @@ static enum relay_connection_status relay_process_data_receive_payload(
        bool new_stream = false, close_requested = false;
        uint64_t left_to_receive = state->left_to_receive;
        struct relay_session *session;
+       int stream_fd = -1;
 
        DBG3("Receiving data for stream id %" PRIu64 " seqnum %" PRIu64 ", %" PRIu64" bytes received, %" PRIu64 " bytes left to receive",
                        state->header.stream_id, state->header.net_seq_num,
@@ -2811,6 +2816,12 @@ static enum relay_connection_status relay_process_data_receive_payload(
                }
        }
 
+       stream_fd = stream_fd_get_fd(stream->stream_fd);
+       if (stream_fd < 0) {
+               status = RELAY_CONNECTION_STATUS_ERROR;
+               goto end_stream_unlock;
+       }
+
        /*
         * The size of the "chunk" received on any iteration is bounded by:
         *   - the data left to receive,
@@ -2828,7 +2839,7 @@ static enum relay_connection_status relay_process_data_receive_payload(
                                PERROR("Socket %d error", conn->sock->fd);
                                status = RELAY_CONNECTION_STATUS_ERROR;
                        }
-                       goto end_stream_unlock;
+                       goto end_put_fd;
                } else if (ret == 0) {
                        /* No more data ready to be consumed on socket. */
                        DBG3("No more data ready for consumption on data socket of stream id %" PRIu64,
@@ -2846,12 +2857,12 @@ static enum relay_connection_status relay_process_data_receive_payload(
                recv_size = ret;
 
                /* Write data to stream output fd. */
-               write_ret = lttng_write(stream->stream_fd->fd, data_buffer,
+               write_ret = lttng_write(stream_fd, data_buffer,
                                recv_size);
                if (write_ret < (ssize_t) recv_size) {
                        ERR("Relay error writing data to file");
                        status = RELAY_CONNECTION_STATUS_ERROR;
-                       goto end_stream_unlock;
+                       goto end_put_fd;
                }
 
                left_to_receive -= recv_size;
@@ -2870,17 +2881,17 @@ static enum relay_connection_status relay_process_data_receive_payload(
                DBG3("Partial receive on data connection of stream id %" PRIu64 ", %" PRIu64 " bytes received, %" PRIu64 " bytes left to receive",
                                state->header.stream_id, state->received,
                                state->left_to_receive);
-               goto end_stream_unlock;
+               goto end_put_fd;
        }
 
-       ret = write_padding_to_file(stream->stream_fd->fd,
+       ret = write_padding_to_file(stream_fd,
                        state->header.padding_size);
        if ((int64_t) ret < (int64_t) state->header.padding_size) {
                ERR("write_padding_to_file: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
                                stream->stream_handle,
                                state->header.net_seq_num, ret);
                status = RELAY_CONNECTION_STATUS_ERROR;
-               goto end_stream_unlock;
+               goto end_put_fd;
        }
 
 
@@ -2892,7 +2903,7 @@ static enum relay_connection_status relay_process_data_receive_payload(
                                        stream->stream_handle,
                                        state->header.net_seq_num, ret);
                        status = RELAY_CONNECTION_STATUS_ERROR;
-                       goto end_stream_unlock;
+                       goto end_put_fd;
                }
        }
 
@@ -2913,6 +2924,8 @@ static enum relay_connection_status relay_process_data_receive_payload(
        connection_reset_protocol_state(conn);
        state = NULL;
 
+end_put_fd:
+       stream_fd_put_fd(stream->stream_fd);
 end_stream_unlock:
        close_requested = stream->close_requested;
        pthread_mutex_unlock(&stream->lock);
index e376fa1684062ef681939a0a7827d61b3f88825c..cc1b70f3fedbad68817277bc38ed9386e721b350 100644 (file)
@@ -1,5 +1,6 @@
 /*
  * Copyright (C) 2015 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *               2018 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License, version 2 only, as
  */
 
 #define _LGPL_SOURCE
+
+#include <urcu/ref.h>
+#include <sys/stat.h>
+#include <fcntl.h>
 #include <common/common.h>
+#include <common/fd-tracker/fd-tracker.h>
+#include <common/fd-tracker/utils.h>
+#include <common/utils.h>
 
 #include "stream-fd.h"
+#include "lttng-relayd.h"
+
+struct stream_fd {
+       bool suspendable;
+       union {
+               /* Suspendable. */
+               struct fs_handle *handle;
+               /* Unsuspendable. */
+               int fd;
+       } u;
+       struct urcu_ref ref;
+};
 
-struct stream_fd *stream_fd_create(int fd)
+static struct stream_fd *_stream_fd_alloc(void)
 {
        struct stream_fd *sf;
 
@@ -29,11 +49,200 @@ struct stream_fd *stream_fd_create(int fd)
                goto end;
        }
        urcu_ref_init(&sf->ref);
-       sf->fd = fd;
 end:
        return sf;
 }
 
+static struct stream_fd *stream_fd_suspendable_create(struct fs_handle *handle)
+{
+       struct stream_fd *stream_fd = _stream_fd_alloc();
+
+       if (!stream_fd) {
+               goto end;
+       }
+
+       stream_fd->suspendable = true;
+       stream_fd->u.handle = handle;
+end:
+       return stream_fd;
+}
+
+static struct stream_fd *stream_fd_unsuspendable_create(int fd)
+{
+       struct stream_fd *stream_fd = _stream_fd_alloc();
+
+       if (!stream_fd) {
+               goto end;
+       }
+
+       stream_fd->suspendable = false;
+       stream_fd->u.fd = fd;
+end:
+       return stream_fd;
+}
+
+static int open_file(void *data, int *out_fd)
+{
+       int ret;
+       const char *path = data;
+
+       ret = open(path, O_RDONLY);
+       if (ret < 0) {
+               goto end;
+       }
+       *out_fd = ret;
+       ret = 0;
+end:
+       return ret;
+}
+
+/*
+ * Stream files are opened (read-only) on the live end of the relayd.
+ * In live mode, it is expected that a client is able to consume a
+ * complete file even if it is replaced (in file rotation mode).
+ *
+ * Thus, it is not possible to open those files as suspendable file
+ * handles. This means that live clients can keep a large number of
+ * open file descriptors. As a work-around, we could create hard links
+ * to the files to make the files suspendable. The original file would be
+ * replaced, but the viewer's hard-link would ensure that the inode is
+ * still available for restoration.
+ *
+ * The main roadblock to this approach is validating that the trace
+ * directory resides in a filesystem that supports hard-links. Otherwise,
+ * a cooperative mechanism could allow the viewer end to mark a file as
+ * being in use and it could be renamed rather than unlinked by the
+ * receiving end.
+ */
+struct stream_fd *stream_fd_open(const char *path)
+{
+       int ret, fd;
+       struct stream_fd *stream_fd = NULL;
+
+       ret = fd_tracker_open_unsuspendable_fd(the_fd_tracker, &fd,
+                       (const char **) &path, 1,
+                       open_file, (void *) path);
+       if (ret) {
+               goto end;
+       }
+
+       stream_fd = stream_fd_unsuspendable_create(fd);
+       if (!stream_fd) {
+               (void) fd_tracker_close_unsuspendable_fd(the_fd_tracker, &fd, 1,
+                               fd_tracker_util_close_fd, NULL);
+       }
+end:
+       return stream_fd;
+}
+
+static
+struct fs_handle *create_fs_handle(const char *path)
+{
+       struct fs_handle *handle;
+       /*
+        * With the session rotation feature on the relay, we might need to seek
+        * and truncate a tracefile, so we need read and write access.
+        */
+       int flags = O_RDWR | O_CREAT | O_TRUNC;
+       /* Open with 660 mode */
+       mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
+
+       handle =  fd_tracker_open_fs_handle(the_fd_tracker, path, flags, &mode);
+       if (!handle) {
+               ERR("Failed to open fs handle to %s", path);
+       }
+
+       return handle;
+}
+
+/*
+ * Stream file are created by on the consumerd/data-reception end. Those
+ * stream fds can be suspended as there is no expectation that the files
+ * will be unlinked and then need to be appended-to.
+ *
+ * Hence, the file descriptors are created as suspendable to allow the
+ * fd-tracker to reduce the number of active fds..
+ */
+struct stream_fd *stream_fd_create(const char *path_name, const char *file_name,
+               uint64_t size, uint64_t count, const char *suffix)
+{
+       struct stream_fd *stream_fd = NULL;
+       struct fs_handle *handle;
+       int ret;
+       char path[PATH_MAX];
+
+       ret = utils_stream_file_name(path, path_name, file_name,
+                       size, count, suffix);
+       if (ret < 0) {
+               goto end;
+       }
+
+       handle = create_fs_handle(path);
+       if (!handle) {
+               goto end;
+       }
+
+       stream_fd = stream_fd_suspendable_create(handle);
+       if (!stream_fd) {
+               (void) fs_handle_close(handle);
+       }
+       
+end:
+       return stream_fd;
+}
+
+int stream_fd_rotate(struct stream_fd *stream_fd, const char *path_name,
+               const char *file_name, uint64_t size,
+               uint64_t count, uint64_t *new_count)
+{
+       int ret;
+       bool should_unlink;
+       char path[PATH_MAX];
+
+       assert(stream_fd);
+       assert(stream_fd->suspendable);
+
+       utils_stream_file_rotation_get_new_count(count, new_count,
+                       &should_unlink);
+
+       ret = utils_stream_file_name(path, path_name, file_name,
+                       size, count, NULL);
+       if (ret < 0) {
+               goto error;
+       }
+
+       ret = fs_handle_close(stream_fd->u.handle);
+       stream_fd->u.handle = NULL;
+       if (ret < 0) {
+               PERROR("Closing stream tracefile handle");
+               goto error;
+       }
+       
+       if (should_unlink) {
+               unlink(path);
+               if (ret < 0 && errno != ENOENT) {
+                       goto error;
+               }
+       }
+
+       ret = utils_stream_file_name(path, path_name, file_name,
+                       size, new_count ? *new_count : 0, NULL);
+       if (ret < 0) {
+               goto error;
+       }
+
+       stream_fd->u.handle = create_fs_handle(path);
+       if (!stream_fd->u.handle) {
+               ret = -1;
+               goto error;
+       }
+
+       ret = 0;
+
+error:
+       return ret;
+}
+
 void stream_fd_get(struct stream_fd *sf)
 {
        urcu_ref_get(&sf->ref);
@@ -44,9 +253,14 @@ static void stream_fd_release(struct urcu_ref *ref)
        struct stream_fd *sf = caa_container_of(ref, struct stream_fd, ref);
        int ret;
 
-       ret = close(sf->fd);
+       if (sf->suspendable) {
+               ret = fs_handle_close(sf->u.handle);
+       } else {
+               ret = fd_tracker_close_unsuspendable_fd(the_fd_tracker, &sf->u.fd,
+                               1, fd_tracker_util_close_fd, NULL);
+       }
        if (ret) {
-               PERROR("Error closing stream FD %d", sf->fd);
+               PERROR("Error closing stream handle");
        }
        free(sf);
 }
@@ -55,3 +269,15 @@ void stream_fd_put(struct stream_fd *sf)
 {
        urcu_ref_put(&sf->ref, stream_fd_release);
 }
+
+int stream_fd_get_fd(struct stream_fd *sf)
+{
+       return sf->suspendable ? fs_handle_get_fd(sf->u.handle) : sf->u.fd;
+}
+
+void stream_fd_put_fd(struct stream_fd *sf)
+{
+       if (sf->suspendable) {
+               fs_handle_put_fd(sf->u.handle);
+       }
+}
index 64f3b16a5f8d9ef2d7b3bd5191b8afdbe564d9a8..9e4dc80b435e83a641f250a508372634e1e728b4 100644 (file)
@@ -3,6 +3,7 @@
 
 /*
  * Copyright (C) 2015 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *               2018 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License, version 2 only, as
  * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */
 
-#include <urcu/ref.h>
+#include <stdint.h>
 
-struct stream_fd {
-       int fd;
-       struct urcu_ref ref;
-};
+struct stream_fd;
 
-struct stream_fd *stream_fd_create(int fd);
+struct stream_fd *stream_fd_open(const char *path);
+struct stream_fd *stream_fd_create(const char *path_name, const char *file_name,
+               uint64_t size, uint64_t count, const char *suffix);
+int stream_fd_rotate(struct stream_fd *sf, const char *path_name,
+               const char *file_name, uint64_t size,
+               uint64_t count, uint64_t *new_count);
 void stream_fd_get(struct stream_fd *sf);
 void stream_fd_put(struct stream_fd *sf);
+int stream_fd_get_fd(struct stream_fd *sf);
+void stream_fd_put_fd(struct stream_fd *sf);
 
 #endif /* _STREAM_FD_H */
index da0b3fd2680bd689846f6c049491bfabf5ddcbae..e8a9fd9b4018fdecc1d0f2f6e77b97db623a1b5d 100644 (file)
@@ -123,13 +123,8 @@ struct relay_stream *stream_create(struct ctf_trace *trace,
         * No need to use run_as API here because whatever we receive,
         * the relayd uses its own credentials for the stream files.
         */
-       ret = utils_create_stream_file(stream->path_name, stream->channel_name,
-                       stream->tracefile_size, 0, -1, -1, NULL);
-       if (ret < 0) {
-               ERR("Create output file");
-               goto end;
-       }
-       stream->stream_fd = stream_fd_create(ret);
+       stream->stream_fd = stream_fd_create(stream->path_name,
+                       stream->channel_name, stream->tracefile_size, 0, NULL);
        if (!stream->stream_fd) {
                if (close(ret)) {
                        PERROR("Error closing file %d", ret);
index 0331f56d0ac2cb2935e0ed9964cd19ae1f398cec..fbf46ec18193ecbbc1a39af6ae400e9a21c538cd 100644 (file)
@@ -797,6 +797,7 @@ int fs_handle_close(struct fs_handle *handle)
        pthread_mutex_lock(&handle->lock);
        fd_tracker_untrack(handle->tracker, handle);
        if (handle->fd >= 0) {
+               assert(!handle->in_use);
                /*
                 * The return value of close() is not propagated as there
                 * isn't much the user can do about it.
index d8a78091a4076fd90bb9d9956286dd1dfb9b8f2b..5ce597f5eaa59528e7df8fe38b47cf4159de1a7d 100644 (file)
@@ -712,7 +712,8 @@ int utils_mkdir_recursive(const char *path, mode_t mode, int uid, int gid)
  *
  * Return 0 on success or else a negative value.
  */
-static int utils_stream_file_name(char *path,
+LTTNG_HIDDEN
+int utils_stream_file_name(char *path,
                const char *path_name, const char *file_name,
                uint64_t size, uint64_t count,
                const char *suffix)
@@ -831,6 +832,36 @@ error:
        return ret;
 }
 
+LTTNG_HIDDEN
+void utils_stream_file_rotation_get_new_count(uint64_t count,
+               uint64_t *new_count, bool *should_unlink)
+{
+       if (count > 0) {
+               /*
+                * In tracefile rotation, for the relay daemon we need
+                * to unlink the old file if present, because it may
+                * still be open in reading by the live thread, and we
+                * need to ensure that we do not overwrite the content
+                * between get_index and get_packet. Since we have no
+                * way to verify integrity of the data content compared
+                * to the associated index, we need to ensure the reader
+                * has exclusive access to the file content, and that
+                * the open of the data file is performed in get_index.
+                * Unlinking the old file rather than overwriting it
+                * achieves this.
+                */
+               if (new_count) {
+                       *new_count = (*new_count + 1) % count;
+               }
+               *should_unlink = true;
+       } else {
+               if (new_count) {
+                       (*new_count)++;
+               }
+               *should_unlink = false;
+       }
+}
+
 /*
  * Change the output tracefile according to the given size and count The
  * new_count pointer is set during this operation.
@@ -846,9 +877,13 @@ int utils_rotate_stream_file(char *path_name, char *file_name, uint64_t size,
                int *stream_fd)
 {
        int ret;
+       bool should_unlink;
 
        assert(stream_fd);
 
+       utils_stream_file_rotation_get_new_count(count, new_count,
+                       &should_unlink);
+
        ret = close(out_fd);
        if (ret < 0) {
                PERROR("Closing tracefile");
@@ -856,32 +891,12 @@ int utils_rotate_stream_file(char *path_name, char *file_name, uint64_t size,
        }
        *stream_fd = -1;
 
-       if (count > 0) {
-               /*
-                * In tracefile rotation, for the relay daemon we need
-                * to unlink the old file if present, because it may
-                * still be open in reading by the live thread, and we
-                * need to ensure that we do not overwrite the content
-                * between get_index and get_packet. Since we have no
-                * way to verify integrity of the data content compared
-                * to the associated index, we need to ensure the reader
-                * has exclusive access to the file content, and that
-                * the open of the data file is performed in get_index.
-                * Unlinking the old file rather than overwriting it
-                * achieves this.
-                */
-               if (new_count) {
-                       *new_count = (*new_count + 1) % count;
-               }
+       if (should_unlink) {
                ret = utils_unlink_stream_file(path_name, file_name, size,
                                new_count ? *new_count : 0, uid, gid, 0);
                if (ret < 0 && errno != ENOENT) {
                        goto error;
                }
-       } else {
-               if (new_count) {
-                       (*new_count)++;
-               }
        }
 
        ret = utils_create_stream_file(path_name, file_name, size,
index 4c7884b765429f650fc3f7bb0d91651997e67467..b0f04368e41e2b61220acc372bdcdf63240c1679 100644 (file)
@@ -22,6 +22,7 @@
 #include <unistd.h>
 #include <stdint.h>
 #include <getopt.h>
+#include <stdbool.h>
 
 #define KIBI_LOG2 10
 #define MEBI_LOG2 20
@@ -39,10 +40,15 @@ int utils_set_fd_cloexec(int fd);
 int utils_create_pid_file(pid_t pid, const char *filepath);
 int utils_mkdir(const char *path, mode_t mode, int uid, int gid);
 int utils_mkdir_recursive(const char *path, mode_t mode, int uid, int gid);
+int utils_stream_file_name(char *path, const char *path_name,
+               const char *file_name, uint64_t size, uint64_t count,
+               const char *suffix);
 int utils_create_stream_file(const char *path_name, char *file_name, uint64_t size,
                uint64_t count, int uid, int gid, char *suffix);
 int utils_unlink_stream_file(const char *path_name, char *file_name, uint64_t size,
                uint64_t count, int uid, int gid, char *suffix);
+void utils_stream_file_rotation_get_new_count(uint64_t count,
+               uint64_t *new_count, bool *should_unlink);
 int utils_rotate_stream_file(char *path_name, char *file_name, uint64_t size,
                uint64_t count, int uid, int gid, int out_fd, uint64_t *new_count,
                int *stream_fd);
This page took 0.038213 seconds and 5 git commands to generate.