From bdafebb5195cfda710b32b019f9cac6c4d86f42a Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=A9mie=20Galarneau?= Date: Fri, 29 Jun 2018 17:48:58 -0400 Subject: [PATCH] Backport: relayd: use the fd-tracker to track stream_fd fds MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérémie Galarneau --- src/bin/lttng-relayd/live.c | 54 +++---- src/bin/lttng-relayd/main.c | 49 +++--- src/bin/lttng-relayd/stream-fd.c | 234 ++++++++++++++++++++++++++++- src/bin/lttng-relayd/stream-fd.h | 17 ++- src/bin/lttng-relayd/stream.c | 9 +- src/common/fd-tracker/fd-tracker.c | 1 + src/common/utils.c | 59 +++++--- src/common/utils.h | 6 + 8 files changed, 345 insertions(+), 84 deletions(-) diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c index 38f19f0c2..03b4fa1c7 100644 --- a/src/bin/lttng-relayd/live.c +++ b/src/bin/lttng-relayd/live.c @@ -1455,12 +1455,8 @@ int viewer_get_next_index(struct relay_connection *conn) if (ret < 0) { goto error_put; } - ret = open(fullpath, O_RDONLY); - if (ret < 0) { - PERROR("Relay opening trace file"); - goto error_put; - } - vstream->stream_fd = stream_fd_create(ret); + + vstream->stream_fd = stream_fd_open(fullpath); if (!vstream->stream_fd) { if (close(ret)) { PERROR("close"); @@ -1561,7 +1557,7 @@ error_put: static int viewer_get_packet(struct relay_connection *conn) { - int ret; + int ret, stream_fd; off_t lseek_ret; char *reply = NULL; struct lttng_viewer_get_packet get_packet_info; @@ -1604,26 +1600,32 @@ int viewer_get_packet(struct relay_connection *conn) } pthread_mutex_lock(&vstream->stream->lock); - lseek_ret = lseek(vstream->stream_fd->fd, be64toh(get_packet_info.offset), - SEEK_SET); + stream_fd = stream_fd_get_fd(vstream->stream_fd); + if (stream_fd < 0) { + ERR("Failed to get viewer stream file descriptor"); + goto error_put_fd; + } + lseek_ret = lseek(stream_fd, be64toh(get_packet_info.offset), SEEK_SET); if (lseek_ret < 0) { - PERROR("lseek fd %d to offset %" PRIu64, vstream->stream_fd->fd, + PERROR("lseek fd %d to offset %" PRIu64, stream_fd, (uint64_t) be64toh(get_packet_info.offset)); - goto error; + goto error_put_fd; } - read_len = lttng_read(vstream->stream_fd->fd, + read_len = lttng_read(stream_fd, reply + sizeof(reply_header), packet_data_len); if (read_len < packet_data_len) { PERROR("Relay reading trace file, fd: %d, offset: %" PRIu64, - vstream->stream_fd->fd, + stream_fd, (uint64_t) be64toh(get_packet_info.offset)); - goto error; + goto error_put_fd; } reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_OK); reply_header.len = htobe32(packet_data_len); goto send_reply; +error_put_fd: + stream_fd_put_fd(vstream->stream_fd); error: reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR); @@ -1670,7 +1672,7 @@ end: static int viewer_get_metadata(struct relay_connection *conn) { - int ret = 0; + int ret = 0, stream_fd = -1; ssize_t read_len; uint64_t len = 0; char *data = NULL; @@ -1730,16 +1732,9 @@ int viewer_get_metadata(struct relay_connection *conn) if (ret < 0) { goto error; } - ret = open(fullpath, O_RDONLY); - if (ret < 0) { - PERROR("Relay opening metadata file"); - goto error; - } - vstream->stream_fd = stream_fd_create(ret); + vstream->stream_fd = stream_fd_open(fullpath); if (!vstream->stream_fd) { - if (close(ret)) { - PERROR("close"); - } + PERROR("Failed to open viewer stream file at %s", fullpath); goto error; } } @@ -1751,10 +1746,14 @@ int viewer_get_metadata(struct relay_connection *conn) goto error; } - read_len = lttng_read(vstream->stream_fd->fd, data, len); + stream_fd = stream_fd_get_fd(vstream->stream_fd); + if (stream_fd < 0) { + goto error_put_fd; + } + read_len = lttng_read(stream_fd, data, len); if (read_len < len) { PERROR("Relay reading metadata file"); - goto error; + goto error_put_fd; } vstream->metadata_sent += read_len; if (vstream->metadata_sent == vstream->stream->metadata_received @@ -1766,7 +1765,8 @@ int viewer_get_metadata(struct relay_connection *conn) reply.status = htobe32(LTTNG_VIEWER_METADATA_OK); goto send_reply; - +error_put_fd: + (void) stream_fd_put_fd(vstream->stream_fd); error: reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR); diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 123731e17..0f584487a 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -1593,9 +1593,8 @@ int relay_reset_metadata(const struct lttcomm_relayd_hdr *recv_hdr, goto end_unlock; } - ret = utils_rotate_stream_file(stream->path_name, stream->channel_name, - 0, 0, -1, -1, stream->stream_fd->fd, NULL, - &stream->stream_fd->fd); + ret = stream_fd_rotate(stream->stream_fd, + stream->path_name, stream->channel_name, 0, 0, NULL); if (ret < 0) { ERR("Failed to rotate metadata file %s of channel %s", stream->path_name, stream->channel_name); @@ -1715,6 +1714,7 @@ static int relay_recv_metadata(const struct lttcomm_relayd_hdr *recv_hdr, struct lttcomm_relayd_metadata_payload metadata_payload_header; struct relay_stream *metadata_stream; uint64_t metadata_payload_size; + int metadata_fd = -1; if (!session) { ERR("Metadata sent before version check"); @@ -1745,20 +1745,24 @@ static int relay_recv_metadata(const struct lttcomm_relayd_hdr *recv_hdr, pthread_mutex_lock(&metadata_stream->lock); - size_ret = lttng_write(metadata_stream->stream_fd->fd, + metadata_fd = stream_fd_get_fd(metadata_stream->stream_fd); + if (metadata_fd < 0) { + goto end_put; + } + size_ret = lttng_write(metadata_fd, payload->data + sizeof(metadata_payload_header), metadata_payload_size); if (size_ret < metadata_payload_size) { ERR("Relay error writing metadata on file"); ret = -1; - goto end_put; + goto end_put_fd; } - size_ret = write_padding_to_file(metadata_stream->stream_fd->fd, + size_ret = write_padding_to_file(metadata_fd, metadata_payload_header.padding_size); if (size_ret < (int64_t) metadata_payload_header.padding_size) { ret = -1; - goto end_put; + goto end_put_fd; } metadata_stream->metadata_received += @@ -1766,6 +1770,8 @@ static int relay_recv_metadata(const struct lttcomm_relayd_hdr *recv_hdr, DBG2("Relay metadata written. Updated metadata_received %" PRIu64, metadata_stream->metadata_received); +end_put_fd: + stream_fd_put_fd(metadata_stream->stream_fd); end_put: pthread_mutex_unlock(&metadata_stream->lock); stream_put(metadata_stream); @@ -2746,11 +2752,9 @@ static enum relay_connection_status relay_process_data_receive_header( /* new_id is updated by utils_rotate_stream_file. */ new_id = old_id; - ret = utils_rotate_stream_file(stream->path_name, + ret = stream_fd_rotate(stream->stream_fd, stream->path_name, stream->channel_name, stream->tracefile_size, - stream->tracefile_count, -1, - -1, stream->stream_fd->fd, - &new_id, &stream->stream_fd->fd); + stream->tracefile_count, &new_id); if (ret < 0) { ERR("Failed to rotate stream output file"); status = RELAY_CONNECTION_STATUS_ERROR; @@ -2787,6 +2791,7 @@ static enum relay_connection_status relay_process_data_receive_payload( bool new_stream = false, close_requested = false; uint64_t left_to_receive = state->left_to_receive; struct relay_session *session; + int stream_fd = -1; DBG3("Receiving data for stream id %" PRIu64 " seqnum %" PRIu64 ", %" PRIu64" bytes received, %" PRIu64 " bytes left to receive", state->header.stream_id, state->header.net_seq_num, @@ -2811,6 +2816,12 @@ static enum relay_connection_status relay_process_data_receive_payload( } } + stream_fd = stream_fd_get_fd(stream->stream_fd); + if (stream_fd < 0) { + status = RELAY_CONNECTION_STATUS_ERROR; + goto end_stream_unlock; + } + /* * The size of the "chunk" received on any iteration is bounded by: * - the data left to receive, @@ -2828,7 +2839,7 @@ static enum relay_connection_status relay_process_data_receive_payload( PERROR("Socket %d error", conn->sock->fd); status = RELAY_CONNECTION_STATUS_ERROR; } - goto end_stream_unlock; + goto end_put_fd; } else if (ret == 0) { /* No more data ready to be consumed on socket. */ DBG3("No more data ready for consumption on data socket of stream id %" PRIu64, @@ -2846,12 +2857,12 @@ static enum relay_connection_status relay_process_data_receive_payload( recv_size = ret; /* Write data to stream output fd. */ - write_ret = lttng_write(stream->stream_fd->fd, data_buffer, + write_ret = lttng_write(stream_fd, data_buffer, recv_size); if (write_ret < (ssize_t) recv_size) { ERR("Relay error writing data to file"); status = RELAY_CONNECTION_STATUS_ERROR; - goto end_stream_unlock; + goto end_put_fd; } left_to_receive -= recv_size; @@ -2870,17 +2881,17 @@ static enum relay_connection_status relay_process_data_receive_payload( DBG3("Partial receive on data connection of stream id %" PRIu64 ", %" PRIu64 " bytes received, %" PRIu64 " bytes left to receive", state->header.stream_id, state->received, state->left_to_receive); - goto end_stream_unlock; + goto end_put_fd; } - ret = write_padding_to_file(stream->stream_fd->fd, + ret = write_padding_to_file(stream_fd, state->header.padding_size); if ((int64_t) ret < (int64_t) state->header.padding_size) { ERR("write_padding_to_file: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d", stream->stream_handle, state->header.net_seq_num, ret); status = RELAY_CONNECTION_STATUS_ERROR; - goto end_stream_unlock; + goto end_put_fd; } @@ -2892,7 +2903,7 @@ static enum relay_connection_status relay_process_data_receive_payload( stream->stream_handle, state->header.net_seq_num, ret); status = RELAY_CONNECTION_STATUS_ERROR; - goto end_stream_unlock; + goto end_put_fd; } } @@ -2913,6 +2924,8 @@ static enum relay_connection_status relay_process_data_receive_payload( connection_reset_protocol_state(conn); state = NULL; +end_put_fd: + stream_fd_put_fd(stream->stream_fd); end_stream_unlock: close_requested = stream->close_requested; pthread_mutex_unlock(&stream->lock); diff --git a/src/bin/lttng-relayd/stream-fd.c b/src/bin/lttng-relayd/stream-fd.c index e376fa168..cc1b70f3f 100644 --- a/src/bin/lttng-relayd/stream-fd.c +++ b/src/bin/lttng-relayd/stream-fd.c @@ -1,5 +1,6 @@ /* * Copyright (C) 2015 - Mathieu Desnoyers + * 2018 - Jérémie Galarneau * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License, version 2 only, as @@ -16,11 +17,30 @@ */ #define _LGPL_SOURCE + +#include +#include +#include #include +#include +#include +#include #include "stream-fd.h" +#include "lttng-relayd.h" + +struct stream_fd { + bool suspendable; + union { + /* Suspendable. */ + struct fs_handle *handle; + /* Unsuspendable. */ + int fd; + } u; + struct urcu_ref ref; +}; -struct stream_fd *stream_fd_create(int fd) +static struct stream_fd *_stream_fd_alloc(void) { struct stream_fd *sf; @@ -29,11 +49,200 @@ struct stream_fd *stream_fd_create(int fd) goto end; } urcu_ref_init(&sf->ref); - sf->fd = fd; end: return sf; } +static struct stream_fd *stream_fd_suspendable_create(struct fs_handle *handle) +{ + struct stream_fd *stream_fd = _stream_fd_alloc(); + + if (!stream_fd) { + goto end; + } + + stream_fd->suspendable = true; + stream_fd->u.handle = handle; +end: + return stream_fd; +} + +static struct stream_fd *stream_fd_unsuspendable_create(int fd) +{ + struct stream_fd *stream_fd = _stream_fd_alloc(); + + if (!stream_fd) { + goto end; + } + + stream_fd->suspendable = false; + stream_fd->u.fd = fd; +end: + return stream_fd; +} + +static int open_file(void *data, int *out_fd) +{ + int ret; + const char *path = data; + + ret = open(path, O_RDONLY); + if (ret < 0) { + goto end; + } + *out_fd = ret; + ret = 0; +end: + return ret; +} + +/* + * Stream files are opened (read-only) on the live end of the relayd. + * In live mode, it is expected that a client is able to consume a + * complete file even if it is replaced (in file rotation mode). + * + * Thus, it is not possible to open those files as suspendable file + * handles. This means that live clients can keep a large number of + * open file descriptors. As a work-around, we could create hard links + * to the files to make the files suspendable. The original file would be + * replaced, but the viewer's hard-link would ensure that the inode is + * still available for restoration. + * + * The main roadblock to this approach is validating that the trace + * directory resides in a filesystem that supports hard-links. Otherwise, + * a cooperative mechanism could allow the viewer end to mark a file as + * being in use and it could be renamed rather than unlinked by the + * receiving end. + */ +struct stream_fd *stream_fd_open(const char *path) +{ + int ret, fd; + struct stream_fd *stream_fd = NULL; + + ret = fd_tracker_open_unsuspendable_fd(the_fd_tracker, &fd, + (const char **) &path, 1, + open_file, (void *) path); + if (ret) { + goto end; + } + + stream_fd = stream_fd_unsuspendable_create(fd); + if (!stream_fd) { + (void) fd_tracker_close_unsuspendable_fd(the_fd_tracker, &fd, 1, + fd_tracker_util_close_fd, NULL); + } +end: + return stream_fd; +} + +static +struct fs_handle *create_fs_handle(const char *path) +{ + struct fs_handle *handle; + /* + * With the session rotation feature on the relay, we might need to seek + * and truncate a tracefile, so we need read and write access. + */ + int flags = O_RDWR | O_CREAT | O_TRUNC; + /* Open with 660 mode */ + mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP; + + handle = fd_tracker_open_fs_handle(the_fd_tracker, path, flags, &mode); + if (!handle) { + ERR("Failed to open fs handle to %s", path); + } + + return handle; +} + +/* + * Stream file are created by on the consumerd/data-reception end. Those + * stream fds can be suspended as there is no expectation that the files + * will be unlinked and then need to be appended-to. + * + * Hence, the file descriptors are created as suspendable to allow the + * fd-tracker to reduce the number of active fds.. + */ +struct stream_fd *stream_fd_create(const char *path_name, const char *file_name, + uint64_t size, uint64_t count, const char *suffix) +{ + struct stream_fd *stream_fd = NULL; + struct fs_handle *handle; + int ret; + char path[PATH_MAX]; + + ret = utils_stream_file_name(path, path_name, file_name, + size, count, suffix); + if (ret < 0) { + goto end; + } + + handle = create_fs_handle(path); + if (!handle) { + goto end; + } + + stream_fd = stream_fd_suspendable_create(handle); + if (!stream_fd) { + (void) fs_handle_close(handle); + } + +end: + return stream_fd; +} + +int stream_fd_rotate(struct stream_fd *stream_fd, const char *path_name, + const char *file_name, uint64_t size, + uint64_t count, uint64_t *new_count) +{ + int ret; + bool should_unlink; + char path[PATH_MAX]; + + assert(stream_fd); + assert(stream_fd->suspendable); + + utils_stream_file_rotation_get_new_count(count, new_count, + &should_unlink); + + ret = utils_stream_file_name(path, path_name, file_name, + size, count, NULL); + if (ret < 0) { + goto error; + } + + ret = fs_handle_close(stream_fd->u.handle); + stream_fd->u.handle = NULL; + if (ret < 0) { + PERROR("Closing stream tracefile handle"); + goto error; + } + + if (should_unlink) { + unlink(path); + if (ret < 0 && errno != ENOENT) { + goto error; + } + } + + ret = utils_stream_file_name(path, path_name, file_name, + size, new_count ? *new_count : 0, NULL); + if (ret < 0) { + goto error; + } + + stream_fd->u.handle = create_fs_handle(path); + if (!stream_fd->u.handle) { + ret = -1; + goto error; + } + + ret = 0; + +error: + return ret; +} + void stream_fd_get(struct stream_fd *sf) { urcu_ref_get(&sf->ref); @@ -44,9 +253,14 @@ static void stream_fd_release(struct urcu_ref *ref) struct stream_fd *sf = caa_container_of(ref, struct stream_fd, ref); int ret; - ret = close(sf->fd); + if (sf->suspendable) { + ret = fs_handle_close(sf->u.handle); + } else { + ret = fd_tracker_close_unsuspendable_fd(the_fd_tracker, &sf->u.fd, + 1, fd_tracker_util_close_fd, NULL); + } if (ret) { - PERROR("Error closing stream FD %d", sf->fd); + PERROR("Error closing stream handle"); } free(sf); } @@ -55,3 +269,15 @@ void stream_fd_put(struct stream_fd *sf) { urcu_ref_put(&sf->ref, stream_fd_release); } + +int stream_fd_get_fd(struct stream_fd *sf) +{ + return sf->suspendable ? fs_handle_get_fd(sf->u.handle) : sf->u.fd; +} + +void stream_fd_put_fd(struct stream_fd *sf) +{ + if (sf->suspendable) { + fs_handle_put_fd(sf->u.handle); + } +} diff --git a/src/bin/lttng-relayd/stream-fd.h b/src/bin/lttng-relayd/stream-fd.h index 64f3b16a5..9e4dc80b4 100644 --- a/src/bin/lttng-relayd/stream-fd.h +++ b/src/bin/lttng-relayd/stream-fd.h @@ -3,6 +3,7 @@ /* * Copyright (C) 2015 - Mathieu Desnoyers + * 2018 - Jérémie Galarneau * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License, version 2 only, as @@ -18,15 +19,19 @@ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#include +#include -struct stream_fd { - int fd; - struct urcu_ref ref; -}; +struct stream_fd; -struct stream_fd *stream_fd_create(int fd); +struct stream_fd *stream_fd_open(const char *path); +struct stream_fd *stream_fd_create(const char *path_name, const char *file_name, + uint64_t size, uint64_t count, const char *suffix); +int stream_fd_rotate(struct stream_fd *sf, const char *path_name, + const char *file_name, uint64_t size, + uint64_t count, uint64_t *new_count); void stream_fd_get(struct stream_fd *sf); void stream_fd_put(struct stream_fd *sf); +int stream_fd_get_fd(struct stream_fd *sf); +void stream_fd_put_fd(struct stream_fd *sf); #endif /* _STREAM_FD_H */ diff --git a/src/bin/lttng-relayd/stream.c b/src/bin/lttng-relayd/stream.c index da0b3fd26..e8a9fd9b4 100644 --- a/src/bin/lttng-relayd/stream.c +++ b/src/bin/lttng-relayd/stream.c @@ -123,13 +123,8 @@ struct relay_stream *stream_create(struct ctf_trace *trace, * No need to use run_as API here because whatever we receive, * the relayd uses its own credentials for the stream files. */ - ret = utils_create_stream_file(stream->path_name, stream->channel_name, - stream->tracefile_size, 0, -1, -1, NULL); - if (ret < 0) { - ERR("Create output file"); - goto end; - } - stream->stream_fd = stream_fd_create(ret); + stream->stream_fd = stream_fd_create(stream->path_name, + stream->channel_name, stream->tracefile_size, 0, NULL); if (!stream->stream_fd) { if (close(ret)) { PERROR("Error closing file %d", ret); diff --git a/src/common/fd-tracker/fd-tracker.c b/src/common/fd-tracker/fd-tracker.c index 0331f56d0..fbf46ec18 100644 --- a/src/common/fd-tracker/fd-tracker.c +++ b/src/common/fd-tracker/fd-tracker.c @@ -797,6 +797,7 @@ int fs_handle_close(struct fs_handle *handle) pthread_mutex_lock(&handle->lock); fd_tracker_untrack(handle->tracker, handle); if (handle->fd >= 0) { + assert(!handle->in_use); /* * The return value of close() is not propagated as there * isn't much the user can do about it. diff --git a/src/common/utils.c b/src/common/utils.c index d8a78091a..5ce597f5e 100644 --- a/src/common/utils.c +++ b/src/common/utils.c @@ -712,7 +712,8 @@ int utils_mkdir_recursive(const char *path, mode_t mode, int uid, int gid) * * Return 0 on success or else a negative value. */ -static int utils_stream_file_name(char *path, +LTTNG_HIDDEN +int utils_stream_file_name(char *path, const char *path_name, const char *file_name, uint64_t size, uint64_t count, const char *suffix) @@ -831,6 +832,36 @@ error: return ret; } +LTTNG_HIDDEN +void utils_stream_file_rotation_get_new_count(uint64_t count, + uint64_t *new_count, bool *should_unlink) +{ + if (count > 0) { + /* + * In tracefile rotation, for the relay daemon we need + * to unlink the old file if present, because it may + * still be open in reading by the live thread, and we + * need to ensure that we do not overwrite the content + * between get_index and get_packet. Since we have no + * way to verify integrity of the data content compared + * to the associated index, we need to ensure the reader + * has exclusive access to the file content, and that + * the open of the data file is performed in get_index. + * Unlinking the old file rather than overwriting it + * achieves this. + */ + if (new_count) { + *new_count = (*new_count + 1) % count; + } + *should_unlink = true; + } else { + if (new_count) { + (*new_count)++; + } + *should_unlink = false; + } +} + /* * Change the output tracefile according to the given size and count The * new_count pointer is set during this operation. @@ -846,9 +877,13 @@ int utils_rotate_stream_file(char *path_name, char *file_name, uint64_t size, int *stream_fd) { int ret; + bool should_unlink; assert(stream_fd); + utils_stream_file_rotation_get_new_count(count, new_count, + &should_unlink); + ret = close(out_fd); if (ret < 0) { PERROR("Closing tracefile"); @@ -856,32 +891,12 @@ int utils_rotate_stream_file(char *path_name, char *file_name, uint64_t size, } *stream_fd = -1; - if (count > 0) { - /* - * In tracefile rotation, for the relay daemon we need - * to unlink the old file if present, because it may - * still be open in reading by the live thread, and we - * need to ensure that we do not overwrite the content - * between get_index and get_packet. Since we have no - * way to verify integrity of the data content compared - * to the associated index, we need to ensure the reader - * has exclusive access to the file content, and that - * the open of the data file is performed in get_index. - * Unlinking the old file rather than overwriting it - * achieves this. - */ - if (new_count) { - *new_count = (*new_count + 1) % count; - } + if (should_unlink) { ret = utils_unlink_stream_file(path_name, file_name, size, new_count ? *new_count : 0, uid, gid, 0); if (ret < 0 && errno != ENOENT) { goto error; } - } else { - if (new_count) { - (*new_count)++; - } } ret = utils_create_stream_file(path_name, file_name, size, diff --git a/src/common/utils.h b/src/common/utils.h index 4c7884b76..b0f04368e 100644 --- a/src/common/utils.h +++ b/src/common/utils.h @@ -22,6 +22,7 @@ #include #include #include +#include #define KIBI_LOG2 10 #define MEBI_LOG2 20 @@ -39,10 +40,15 @@ int utils_set_fd_cloexec(int fd); int utils_create_pid_file(pid_t pid, const char *filepath); int utils_mkdir(const char *path, mode_t mode, int uid, int gid); int utils_mkdir_recursive(const char *path, mode_t mode, int uid, int gid); +int utils_stream_file_name(char *path, const char *path_name, + const char *file_name, uint64_t size, uint64_t count, + const char *suffix); int utils_create_stream_file(const char *path_name, char *file_name, uint64_t size, uint64_t count, int uid, int gid, char *suffix); int utils_unlink_stream_file(const char *path_name, char *file_name, uint64_t size, uint64_t count, int uid, int gid, char *suffix); +void utils_stream_file_rotation_get_new_count(uint64_t count, + uint64_t *new_count, bool *should_unlink); int utils_rotate_stream_file(char *path_name, char *file_name, uint64_t size, uint64_t count, int uid, int gid, int out_fd, uint64_t *new_count, int *stream_fd); -- 2.34.1