From f465e9787f5b70d4a8f3761c60b8d6e4161e9628 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=A9mie=20Galarneau?= Date: Sat, 30 Jun 2018 23:20:43 -0400 Subject: [PATCH] Backport: relayd: replace lttng_index_file with relay_index_file MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit lttng_index_file is shared between the consumer and relay daemon. However, the introduction of the fd-tracker in the relay daemon makes it hard to cleanly share this piece of code between both daemons. The ctf-index.h header is still shared by both daemons which is the most important part. The lttng/relay_index_file class is a fairly thin wrapper around file system operations (unlink, read, and write an index) so there is little value gained in sharing the code vs heavily modifying it to handle the presence of an fd-tracker in the process. Signed-off-by: Jérémie Galarneau --- src/bin/lttng-relayd/Makefile.am | 3 +- src/bin/lttng-relayd/index-file.c | 388 +++++++++++++++++++++++++++ src/bin/lttng-relayd/index-file.h | 47 ++++ src/bin/lttng-relayd/index.c | 16 +- src/bin/lttng-relayd/index.h | 7 +- src/bin/lttng-relayd/live.c | 9 +- src/bin/lttng-relayd/main.c | 6 +- src/bin/lttng-relayd/stream.c | 2 +- src/bin/lttng-relayd/stream.h | 3 +- src/bin/lttng-relayd/viewer-stream.c | 16 +- src/bin/lttng-relayd/viewer-stream.h | 3 +- 11 files changed, 468 insertions(+), 32 deletions(-) create mode 100644 src/bin/lttng-relayd/index-file.c create mode 100644 src/bin/lttng-relayd/index-file.h diff --git a/src/bin/lttng-relayd/Makefile.am b/src/bin/lttng-relayd/Makefile.am index 9fdadbcf5..f8ba4ead6 100644 --- a/src/bin/lttng-relayd/Makefile.am +++ b/src/bin/lttng-relayd/Makefile.am @@ -19,7 +19,8 @@ lttng_relayd_SOURCES = main.c lttng-relayd.h utils.h utils.c cmd.h \ connection.c connection.h \ viewer-session.c viewer-session.h \ tracefile-array.c tracefile-array.h \ - tcp_keep_alive.c tcp_keep_alive.h + tcp_keep_alive.c tcp_keep_alive.h \ + index-file.c index-file.h # link on liblttngctl for check if relayd is already alive. lttng_relayd_LDADD = -lurcu-common -lurcu \ diff --git a/src/bin/lttng-relayd/index-file.c b/src/bin/lttng-relayd/index-file.c new file mode 100644 index 000000000..010d0ad9c --- /dev/null +++ b/src/bin/lttng-relayd/index-file.c @@ -0,0 +1,388 @@ +/* + * Copyright (C) 2018 - Jérémie Galarneau + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License, version 2 only, as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "index-file.h" +#include "lttng-relayd.h" + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +struct relay_index_file { + bool suspendable; + union { + /* Suspendable. */ + struct fs_handle *handle; + /* Unsuspendable. */ + int fd; + } u; + uint32_t major; + uint32_t minor; + uint32_t element_len; + struct urcu_ref ref; +}; + +/* + * Create the index file associated with a trace file. + * + * Return allocated struct lttng_index_file, NULL on error. + */ +struct relay_index_file *relay_index_file_create(const char *path_name, + const char *stream_name, uint64_t size, uint64_t count, + uint32_t idx_major, uint32_t idx_minor) +{ + struct relay_index_file *index_file; + struct fs_handle *fs_handle = NULL; + int ret, fd = -1; + ssize_t size_ret; + struct ctf_packet_index_file_hdr hdr; + char idx_dir_path[LTTNG_PATH_MAX]; + char idx_file_path[LTTNG_PATH_MAX]; + /* + * With the session rotation feature on the relay, we might need to seek + * and truncate a tracefile, so we need read and write access. + */ + int flags = O_RDWR | O_CREAT | O_TRUNC; + /* Open with 660 mode */ + mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP; + + index_file = zmalloc(sizeof(*index_file)); + if (!index_file) { + PERROR("allocating relay_index_file"); + goto error; + } + + /* + * The receiving end of the relay daemon is not expected to try + * to append to an index file. It is thus safe to create it as + * suspendable. + */ + index_file->suspendable = true; + + ret = snprintf(idx_dir_path, sizeof(idx_dir_path), "%s/" DEFAULT_INDEX_DIR, + path_name); + if (ret < 0) { + PERROR("snprintf index path"); + goto error; + } + + /* Create index directory if necessary. */ + ret = utils_mkdir(idx_dir_path, S_IRWXU | S_IRWXG, -1, -1); + if (ret < 0) { + if (errno != EEXIST) { + PERROR("Index trace directory creation error"); + goto error; + } + } + + ret = utils_stream_file_name(idx_file_path, idx_dir_path, stream_name, + size, count, DEFAULT_INDEX_FILE_SUFFIX); + if (ret < 0) { + ERR("Could not build path of index file"); + goto error; + } + + /* + * For tracefile rotation. We need to unlink the old + * file if present to synchronize with the tail of the + * live viewer which could be working on this same file. + * By doing so, any reference to the old index file + * stays valid even if we re-create a new file with the + * same name afterwards. + */ + unlink(idx_file_path); + if (ret < 0 && errno != ENOENT) { + PERROR("Failed to unlink index file"); + goto error; + } + + fs_handle = fd_tracker_open_fs_handle(the_fd_tracker, idx_file_path, + flags, &mode); + if (!fs_handle) { + goto error; + } + index_file->u.handle = fs_handle; + + fd = fs_handle_get_fd(fs_handle); + if (fd < 0) { + goto error; + } + + ctf_packet_index_file_hdr_init(&hdr, idx_major, idx_minor); + size_ret = lttng_write(fd, &hdr, sizeof(hdr)); + if (size_ret < sizeof(hdr)) { + PERROR("write index header"); + goto error; + } + + index_file->major = idx_major; + index_file->minor = idx_minor; + index_file->element_len = ctf_packet_index_len(idx_major, idx_minor); + urcu_ref_init(&index_file->ref); + + fs_handle_put_fd(fs_handle); + + return index_file; + +error: + if (fd >= 0) { + fs_handle_put_fd(fs_handle); + } + if (fs_handle) { + int close_ret; + + close_ret = fs_handle_close(fs_handle); + if (close_ret < 0) { + PERROR("Failed to close index filesystem handle"); + } + } + free(index_file); + return NULL; +} + +static +int open_file(void *data, int *out_fd) +{ + int ret; + const char *path = data; + + ret = open(path, O_RDONLY); + if (ret < 0) { + goto end; + } + *out_fd = ret; + ret = 0; +end: + return ret; +} + +struct relay_index_file *relay_index_file_open(const char *path_name, + const char *channel_name, uint64_t tracefile_count, + uint64_t tracefile_count_current) +{ + struct relay_index_file *index_file; + int ret, fd; + ssize_t read_len; + char fullpath[PATH_MAX]; + char *path_param = fullpath; + struct ctf_packet_index_file_hdr hdr; + uint32_t major, minor, element_len; + + assert(path_name); + assert(channel_name); + + index_file = zmalloc(sizeof(*index_file)); + if (!index_file) { + PERROR("Failed to allocate relay_index_file"); + goto error; + } + + index_file->suspendable = false; + + if (tracefile_count > 0) { + ret = snprintf(fullpath, sizeof(fullpath), "%s/" DEFAULT_INDEX_DIR "/%s_%" + PRIu64 DEFAULT_INDEX_FILE_SUFFIX, path_name, + channel_name, tracefile_count_current); + } else { + ret = snprintf(fullpath, sizeof(fullpath), "%s/" DEFAULT_INDEX_DIR "/%s" + DEFAULT_INDEX_FILE_SUFFIX, path_name, channel_name); + } + if (ret < 0) { + PERROR("Failed to build index path"); + goto error; + } + + DBG("Index opening file %s in read only", fullpath); + ret = fd_tracker_open_unsuspendable_fd(the_fd_tracker, &fd, + (const char **) &path_param, 1, + open_file, (void *) fullpath); + if (ret < 0) { + PERROR("Failed to open index file at %s", fullpath); + goto error; + } + + read_len = lttng_read(fd, &hdr, sizeof(hdr)); + if (read_len < 0) { + PERROR("Failed to read index header"); + goto error_close; + } + + if (be32toh(hdr.magic) != CTF_INDEX_MAGIC) { + ERR("Invalid header magic %#010x, expected %#010x", + be32toh(hdr.magic), CTF_INDEX_MAGIC); + goto error_close; + } + major = be32toh(hdr.index_major); + minor = be32toh(hdr.index_minor); + element_len = be32toh(hdr.packet_index_len); + + if (major != CTF_INDEX_MAJOR) { + ERR("Invalid header version, major = %" PRIu32 ", expected %i", + major, CTF_INDEX_MAJOR); + goto error_close; + } + if (element_len > sizeof(struct ctf_packet_index)) { + ERR("Index element length too long (%" PRIu32 " bytes)", + element_len); + goto error_close; + } + + index_file->u.fd = fd; + index_file->major = major; + index_file->minor = minor; + index_file->element_len = element_len; + urcu_ref_init(&index_file->ref); + + return index_file; + +error_close: + ret = fd_tracker_close_unsuspendable_fd(the_fd_tracker, &fd, + 1, fd_tracker_util_close_fd, NULL); + if (ret < 0) { + PERROR("Failed to close index fd %d", fd); + } + +error: + free(index_file); + return NULL; +} + +int relay_index_file_write(const struct relay_index_file *index_file, + const struct ctf_packet_index *element) +{ + int fd, ret; + ssize_t write_ret; + + assert(index_file); + assert(element); + + fd = index_file->suspendable ? + fs_handle_get_fd(index_file->u.handle) : + index_file->u.fd; + if (fd < 0) { + ret = fd; + goto end; + } + + write_ret = lttng_write(fd, element, index_file->element_len); + if (write_ret < index_file->element_len) { + PERROR("Failed to write packet index to index file"); + ret = -1; + } + ret = 0; + + if (index_file->suspendable) { + fs_handle_put_fd(index_file->u.handle); + } +end: + return ret; +} + +int relay_index_file_read(const struct relay_index_file *index_file, + struct ctf_packet_index *element) +{ + int fd, ret; + ssize_t read_ret; + + assert(index_file); + assert(element); + + fd = index_file->suspendable ? + fs_handle_get_fd(index_file->u.handle) : + index_file->u.fd; + if (fd < 0) { + ret = fd; + goto end; + } + + read_ret = lttng_read(fd, element, index_file->element_len); + if (read_ret < index_file->element_len) { + PERROR("Failed to read packet index from file"); + ret = -1; + } + ret = 0; + + if (index_file->suspendable) { + fs_handle_put_fd(index_file->u.handle); + } +end: + return ret; +} + +int relay_index_file_seek_end(struct relay_index_file *index_file) +{ + int fd, ret = 0; + off_t lseek_ret; + + fd = index_file->suspendable ? + fs_handle_get_fd(index_file->u.handle) : + index_file->u.fd; + if (fd < 0) { + ret = fd; + goto end; + } + + lseek_ret = lseek(fd, 0, SEEK_END); + if (lseek_ret < 0) { + ret = lseek_ret; + } + + if (index_file->suspendable) { + fs_handle_put_fd(index_file->u.handle); + } +end: + return ret; +} + +void relay_index_file_get(struct relay_index_file *index_file) +{ + urcu_ref_get(&index_file->ref); +} + +static +void relay_index_file_release(struct urcu_ref *ref) +{ + int ret; + struct relay_index_file *index_file = caa_container_of(ref, + struct relay_index_file, ref); + + if (index_file->suspendable) { + ret = fs_handle_close(index_file->u.handle); + } else { + ret = fd_tracker_close_unsuspendable_fd(the_fd_tracker, &index_file->u.fd, + 1, fd_tracker_util_close_fd, NULL); + } + if (ret < 0) { + PERROR("Failed to close index file"); + } + free(index_file); +} + +void relay_index_file_put(struct relay_index_file *index_file) +{ + urcu_ref_put(&index_file->ref, relay_index_file_release); +} diff --git a/src/bin/lttng-relayd/index-file.h b/src/bin/lttng-relayd/index-file.h new file mode 100644 index 000000000..fa34f1319 --- /dev/null +++ b/src/bin/lttng-relayd/index-file.h @@ -0,0 +1,47 @@ +/* + * Copyright (C) 2018 - Jérémie Galarneau + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License, version 2 only, as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef RELAY_INDEX_FILE_H +#define RELAY_INDEX_FILE_H + +#include +#include + +struct relay_index_file; + +/* + * create and open have refcount of 1. Use put to decrement the + * refcount. Destroys when reaching 0. Use "get" to increment refcount. + */ +struct relay_index_file *relay_index_file_create(const char *path_name, + const char *stream_name, uint64_t size, + uint64_t count, uint32_t major, uint32_t minor); +struct relay_index_file *relay_index_file_open(const char *path_name, + const char *channel_name, uint64_t tracefile_count, + uint64_t tracefile_count_current); + +int relay_index_file_write(const struct relay_index_file *index_file, + const struct ctf_packet_index *element); +int relay_index_file_read(const struct relay_index_file *index_file, + struct ctf_packet_index *element); + +int relay_index_file_seek_end(struct relay_index_file *index_file); + +void relay_index_file_get(struct relay_index_file *index_file); +void relay_index_file_put(struct relay_index_file *index_file); + +#endif /* RELAY_INDEX_FILE_H */ diff --git a/src/bin/lttng-relayd/index.c b/src/bin/lttng-relayd/index.c index b15bbcd77..fa5b373c8 100644 --- a/src/bin/lttng-relayd/index.c +++ b/src/bin/lttng-relayd/index.c @@ -167,7 +167,7 @@ end: } int relay_index_set_file(struct relay_index *index, - struct lttng_index_file *index_file, + struct relay_index_file *index_file, uint64_t data_offset) { int ret = 0; @@ -177,7 +177,7 @@ int relay_index_set_file(struct relay_index *index, ret = -1; goto end; } - lttng_index_file_get(index_file); + relay_index_file_get(index_file); index->index_file = index_file; index->index_data.offset = data_offset; end: @@ -230,7 +230,7 @@ static void index_release(struct urcu_ref *ref) struct lttng_ht_iter iter; if (index->index_file) { - lttng_index_file_put(index->index_file); + relay_index_file_put(index->index_file); index->index_file = NULL; } if (index->in_hash_table) { @@ -284,7 +284,6 @@ int relay_index_try_flush(struct relay_index *index) { int ret = 1; bool flushed = false; - int fd; pthread_mutex_lock(&index->lock); if (index->flushed) { @@ -294,13 +293,12 @@ int relay_index_try_flush(struct relay_index *index) if (!index->has_index_data || !index->index_file) { goto skip; } - fd = index->index_file->fd; - DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64 - " on fd %d", index->stream->stream_handle, - index->index_n.key, fd); + DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64, + index->stream->stream_handle, + index->index_n.key); flushed = true; index->flushed = true; - ret = lttng_index_file_write(index->index_file, &index->index_data); + ret = relay_index_file_write(index->index_file, &index->index_data); skip: pthread_mutex_unlock(&index->lock); diff --git a/src/bin/lttng-relayd/index.h b/src/bin/lttng-relayd/index.h index 80fe86ab0..42d7a5343 100644 --- a/src/bin/lttng-relayd/index.h +++ b/src/bin/lttng-relayd/index.h @@ -22,10 +22,11 @@ #include #include +#include #include -#include +#include "index-file.h" #include "stream-fd.h" struct relay_stream; @@ -43,7 +44,7 @@ struct relay_index { * index file on which to write the index data. May differ from * stream->index_file due to tracefile rotation. */ - struct lttng_index_file *index_file; + struct relay_index_file *index_file; /* Index packet data. This is the data that is written on disk. */ struct ctf_packet_index index_data; @@ -65,7 +66,7 @@ struct relay_index *relay_index_get_by_id_or_create(struct relay_stream *stream, uint64_t net_seq_num); void relay_index_put(struct relay_index *index); int relay_index_set_file(struct relay_index *index, - struct lttng_index_file *index_file, + struct relay_index_file *index_file, uint64_t data_offset); int relay_index_set_data(struct relay_index *index, const struct ctf_packet_index *data); diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c index 03b4fa1c7..9bb719ef6 100644 --- a/src/bin/lttng-relayd/live.c +++ b/src/bin/lttng-relayd/live.c @@ -47,7 +47,6 @@ #include #include #include -#include #include #include #include @@ -67,6 +66,7 @@ #include "ctf-trace.h" #include "connection.h" #include "viewer-session.h" +#include "index-file.h" #define SESSION_BUF_DEFAULT_COUNT 16 @@ -1231,7 +1231,7 @@ static int try_open_index(struct relay_viewer_stream *vstream, ret = -ENOENT; goto end; } - vstream->index_file = lttng_index_file_open(vstream->path_name, + vstream->index_file = relay_index_file_open(vstream->path_name, vstream->channel_name, vstream->stream->tracefile_count, vstream->current_tracefile_id); @@ -1473,10 +1473,9 @@ int viewer_get_next_index(struct relay_connection *conn) viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM; } - ret = lttng_index_file_read(vstream->index_file, &packet_index); + ret = relay_index_file_read(vstream->index_file, &packet_index); if (ret) { - ERR("Relay error reading index file %d", - vstream->index_file->fd); + ERR("Relay error reading index file"); viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR); goto send_reply; } else { diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 0f584487a..d1ee84eae 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -2617,14 +2617,14 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, /* Put ref on previous index_file. */ if (stream->index_file) { - lttng_index_file_put(stream->index_file); + relay_index_file_put(stream->index_file); stream->index_file = NULL; } major = stream->trace->session->major; minor = stream->trace->session->minor; - stream->index_file = lttng_index_file_create(stream->path_name, + stream->index_file = relay_index_file_create(stream->path_name, stream->channel_name, - -1, -1, stream->tracefile_size, + stream->tracefile_size, tracefile_array_get_file_index_head(stream->tfa), lttng_to_index_major(major, minor), lttng_to_index_minor(major, minor)); diff --git a/src/bin/lttng-relayd/stream.c b/src/bin/lttng-relayd/stream.c index e8a9fd9b4..b033dda94 100644 --- a/src/bin/lttng-relayd/stream.c +++ b/src/bin/lttng-relayd/stream.c @@ -302,7 +302,7 @@ static void stream_release(struct urcu_ref *ref) stream->stream_fd = NULL; } if (stream->index_file) { - lttng_index_file_put(stream->index_file); + relay_index_file_put(stream->index_file); stream->index_file = NULL; } if (stream->trace) { diff --git a/src/bin/lttng-relayd/stream.h b/src/bin/lttng-relayd/stream.h index d471c7b7f..e69a255ba 100644 --- a/src/bin/lttng-relayd/stream.h +++ b/src/bin/lttng-relayd/stream.h @@ -30,6 +30,7 @@ #include "session.h" #include "stream-fd.h" #include "tracefile-array.h" +#include "index-file.h" /* * Represents a stream in the relay @@ -59,7 +60,7 @@ struct relay_stream { /* FD on which to write the stream data. */ struct stream_fd *stream_fd; /* index file on which to write the index data. */ - struct lttng_index_file *index_file; + struct relay_index_file *index_file; char *path_name; char *channel_name; diff --git a/src/bin/lttng-relayd/viewer-stream.c b/src/bin/lttng-relayd/viewer-stream.c index 8b0d6ab84..2a580ca53 100644 --- a/src/bin/lttng-relayd/viewer-stream.c +++ b/src/bin/lttng-relayd/viewer-stream.c @@ -19,11 +19,11 @@ #define _LGPL_SOURCE #include -#include #include #include "lttng-relayd.h" #include "viewer-stream.h" +#include "index-file.h" static void viewer_stream_destroy(struct relay_viewer_stream *vstream) { @@ -118,7 +118,7 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, if (stream->index_received_seqcount == 0) { vstream->index_file = NULL; } else { - vstream->index_file = lttng_index_file_open(vstream->path_name, + vstream->index_file = relay_index_file_open(vstream->path_name, vstream->channel_name, stream->tracefile_count, vstream->current_tracefile_id); @@ -128,10 +128,10 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, } if (seek_t == LTTNG_VIEWER_SEEK_LAST && vstream->index_file) { - off_t lseek_ret; + int ret; - lseek_ret = lseek(vstream->index_file->fd, 0, SEEK_END); - if (lseek_ret < 0) { + ret = relay_index_file_seek_end(vstream->index_file); + if (ret < 0) { goto error_unlock; } } @@ -184,7 +184,7 @@ static void viewer_stream_release(struct urcu_ref *ref) vstream->stream_fd = NULL; } if (vstream->index_file) { - lttng_index_file_put(vstream->index_file); + relay_index_file_put(vstream->index_file); vstream->index_file = NULL; } if (vstream->stream) { @@ -297,7 +297,7 @@ int viewer_stream_rotate(struct relay_viewer_stream *vstream) } if (vstream->index_file) { - lttng_index_file_put(vstream->index_file); + relay_index_file_put(vstream->index_file); vstream->index_file = NULL; } if (vstream->stream_fd) { @@ -305,7 +305,7 @@ int viewer_stream_rotate(struct relay_viewer_stream *vstream) vstream->stream_fd = NULL; } - vstream->index_file = lttng_index_file_open(vstream->path_name, + vstream->index_file = relay_index_file_open(vstream->path_name, vstream->channel_name, stream->tracefile_count, vstream->current_tracefile_id); diff --git a/src/bin/lttng-relayd/viewer-stream.h b/src/bin/lttng-relayd/viewer-stream.h index 2514b1722..1a390964d 100644 --- a/src/bin/lttng-relayd/viewer-stream.h +++ b/src/bin/lttng-relayd/viewer-stream.h @@ -29,6 +29,7 @@ #include "ctf-trace.h" #include "lttng-viewer-abi.h" #include "stream.h" +#include "index-file.h" struct relay_stream; @@ -53,7 +54,7 @@ struct relay_viewer_stream { /* FD from which to read the stream data. */ struct stream_fd *stream_fd; /* index file from which to read the index data. */ - struct lttng_index_file *index_file; + struct relay_index_file *index_file; char *path_name; char *channel_name; -- 2.34.1