if (ret < 0) {
goto error_put;
}
- ret = open(fullpath, O_RDONLY);
- if (ret < 0) {
- PERROR("Relay opening trace file");
- goto error_put;
- }
- vstream->stream_fd = stream_fd_create(ret);
+
+ vstream->stream_fd = stream_fd_open(fullpath);
if (!vstream->stream_fd) {
if (close(ret)) {
PERROR("close");
static
int viewer_get_packet(struct relay_connection *conn)
{
- int ret;
+ int ret, stream_fd;
off_t lseek_ret;
char *reply = NULL;
struct lttng_viewer_get_packet get_packet_info;
}
pthread_mutex_lock(&vstream->stream->lock);
- lseek_ret = lseek(vstream->stream_fd->fd, be64toh(get_packet_info.offset),
- SEEK_SET);
+ stream_fd = stream_fd_get_fd(vstream->stream_fd);
+ if (stream_fd < 0) {
+ ERR("Failed to get viewer stream file descriptor");
+ goto error_put_fd;
+ }
+ lseek_ret = lseek(stream_fd, be64toh(get_packet_info.offset), SEEK_SET);
if (lseek_ret < 0) {
- PERROR("lseek fd %d to offset %" PRIu64, vstream->stream_fd->fd,
+ PERROR("lseek fd %d to offset %" PRIu64, stream_fd,
(uint64_t) be64toh(get_packet_info.offset));
- goto error;
+ goto error_put_fd;
}
- read_len = lttng_read(vstream->stream_fd->fd,
+ read_len = lttng_read(stream_fd,
reply + sizeof(reply_header),
packet_data_len);
if (read_len < packet_data_len) {
PERROR("Relay reading trace file, fd: %d, offset: %" PRIu64,
- vstream->stream_fd->fd,
+ stream_fd,
(uint64_t) be64toh(get_packet_info.offset));
- goto error;
+ goto error_put_fd;
}
reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_OK);
reply_header.len = htobe32(packet_data_len);
goto send_reply;
+error_put_fd:
+ stream_fd_put_fd(vstream->stream_fd);
error:
reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
static
int viewer_get_metadata(struct relay_connection *conn)
{
- int ret = 0;
+ int ret = 0, stream_fd = -1;
ssize_t read_len;
uint64_t len = 0;
char *data = NULL;
if (ret < 0) {
goto error;
}
- ret = open(fullpath, O_RDONLY);
- if (ret < 0) {
- PERROR("Relay opening metadata file");
- goto error;
- }
- vstream->stream_fd = stream_fd_create(ret);
+ vstream->stream_fd = stream_fd_open(fullpath);
if (!vstream->stream_fd) {
- if (close(ret)) {
- PERROR("close");
- }
+ PERROR("Failed to open viewer stream file at %s", fullpath);
goto error;
}
}
goto error;
}
- read_len = lttng_read(vstream->stream_fd->fd, data, len);
+ stream_fd = stream_fd_get_fd(vstream->stream_fd);
+ if (stream_fd < 0) {
+ goto error_put_fd;
+ }
+ read_len = lttng_read(stream_fd, data, len);
if (read_len < len) {
PERROR("Relay reading metadata file");
- goto error;
+ goto error_put_fd;
}
vstream->metadata_sent += read_len;
if (vstream->metadata_sent == vstream->stream->metadata_received
reply.status = htobe32(LTTNG_VIEWER_METADATA_OK);
goto send_reply;
-
+error_put_fd:
+ (void) stream_fd_put_fd(vstream->stream_fd);
error:
reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR);
goto end_unlock;
}
- ret = utils_rotate_stream_file(stream->path_name, stream->channel_name,
- 0, 0, -1, -1, stream->stream_fd->fd, NULL,
- &stream->stream_fd->fd);
+ ret = stream_fd_rotate(stream->stream_fd,
+ stream->path_name, stream->channel_name, 0, 0, NULL);
if (ret < 0) {
ERR("Failed to rotate metadata file %s of channel %s",
stream->path_name, stream->channel_name);
struct lttcomm_relayd_metadata_payload metadata_payload_header;
struct relay_stream *metadata_stream;
uint64_t metadata_payload_size;
+ int metadata_fd = -1;
if (!session) {
ERR("Metadata sent before version check");
pthread_mutex_lock(&metadata_stream->lock);
- size_ret = lttng_write(metadata_stream->stream_fd->fd,
+ metadata_fd = stream_fd_get_fd(metadata_stream->stream_fd);
+ if (metadata_fd < 0) {
+ goto end_put;
+ }
+ size_ret = lttng_write(metadata_fd,
payload->data + sizeof(metadata_payload_header),
metadata_payload_size);
if (size_ret < metadata_payload_size) {
ERR("Relay error writing metadata on file");
ret = -1;
- goto end_put;
+ goto end_put_fd;
}
- size_ret = write_padding_to_file(metadata_stream->stream_fd->fd,
+ size_ret = write_padding_to_file(metadata_fd,
metadata_payload_header.padding_size);
if (size_ret < (int64_t) metadata_payload_header.padding_size) {
ret = -1;
- goto end_put;
+ goto end_put_fd;
}
metadata_stream->metadata_received +=
DBG2("Relay metadata written. Updated metadata_received %" PRIu64,
metadata_stream->metadata_received);
+end_put_fd:
+ stream_fd_put_fd(metadata_stream->stream_fd);
end_put:
pthread_mutex_unlock(&metadata_stream->lock);
stream_put(metadata_stream);
/* new_id is updated by utils_rotate_stream_file. */
new_id = old_id;
- ret = utils_rotate_stream_file(stream->path_name,
+ ret = stream_fd_rotate(stream->stream_fd, stream->path_name,
stream->channel_name, stream->tracefile_size,
- stream->tracefile_count, -1,
- -1, stream->stream_fd->fd,
- &new_id, &stream->stream_fd->fd);
+ stream->tracefile_count, &new_id);
if (ret < 0) {
ERR("Failed to rotate stream output file");
status = RELAY_CONNECTION_STATUS_ERROR;
bool new_stream = false, close_requested = false;
uint64_t left_to_receive = state->left_to_receive;
struct relay_session *session;
+ int stream_fd = -1;
DBG3("Receiving data for stream id %" PRIu64 " seqnum %" PRIu64 ", %" PRIu64" bytes received, %" PRIu64 " bytes left to receive",
state->header.stream_id, state->header.net_seq_num,
}
}
+ stream_fd = stream_fd_get_fd(stream->stream_fd);
+ if (stream_fd < 0) {
+ status = RELAY_CONNECTION_STATUS_ERROR;
+ goto end_stream_unlock;
+ }
+
/*
* The size of the "chunk" received on any iteration is bounded by:
* - the data left to receive,
PERROR("Socket %d error", conn->sock->fd);
status = RELAY_CONNECTION_STATUS_ERROR;
}
- goto end_stream_unlock;
+ goto end_put_fd;
} else if (ret == 0) {
/* No more data ready to be consumed on socket. */
DBG3("No more data ready for consumption on data socket of stream id %" PRIu64,
recv_size = ret;
/* Write data to stream output fd. */
- write_ret = lttng_write(stream->stream_fd->fd, data_buffer,
+ write_ret = lttng_write(stream_fd, data_buffer,
recv_size);
if (write_ret < (ssize_t) recv_size) {
ERR("Relay error writing data to file");
status = RELAY_CONNECTION_STATUS_ERROR;
- goto end_stream_unlock;
+ goto end_put_fd;
}
left_to_receive -= recv_size;
DBG3("Partial receive on data connection of stream id %" PRIu64 ", %" PRIu64 " bytes received, %" PRIu64 " bytes left to receive",
state->header.stream_id, state->received,
state->left_to_receive);
- goto end_stream_unlock;
+ goto end_put_fd;
}
- ret = write_padding_to_file(stream->stream_fd->fd,
+ ret = write_padding_to_file(stream_fd,
state->header.padding_size);
if ((int64_t) ret < (int64_t) state->header.padding_size) {
ERR("write_padding_to_file: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
stream->stream_handle,
state->header.net_seq_num, ret);
status = RELAY_CONNECTION_STATUS_ERROR;
- goto end_stream_unlock;
+ goto end_put_fd;
}
stream->stream_handle,
state->header.net_seq_num, ret);
status = RELAY_CONNECTION_STATUS_ERROR;
- goto end_stream_unlock;
+ goto end_put_fd;
}
}
connection_reset_protocol_state(conn);
state = NULL;
+end_put_fd:
+ stream_fd_put_fd(stream->stream_fd);
end_stream_unlock:
close_requested = stream->close_requested;
pthread_mutex_unlock(&stream->lock);
/*
* Copyright (C) 2015 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ * 2018 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
*/
#define _LGPL_SOURCE
+
+#include <urcu/ref.h>
+#include <sys/stat.h>
+#include <fcntl.h>
#include <common/common.h>
+#include <common/fd-tracker/fd-tracker.h>
+#include <common/fd-tracker/utils.h>
+#include <common/utils.h>
#include "stream-fd.h"
+#include "lttng-relayd.h"
+
+struct stream_fd {
+ bool suspendable;
+ union {
+ /* Suspendable. */
+ struct fs_handle *handle;
+ /* Unsuspendable. */
+ int fd;
+ } u;
+ struct urcu_ref ref;
+};
-struct stream_fd *stream_fd_create(int fd)
+static struct stream_fd *_stream_fd_alloc(void)
{
struct stream_fd *sf;
goto end;
}
urcu_ref_init(&sf->ref);
- sf->fd = fd;
end:
return sf;
}
+static struct stream_fd *stream_fd_suspendable_create(struct fs_handle *handle)
+{
+ struct stream_fd *stream_fd = _stream_fd_alloc();
+
+ if (!stream_fd) {
+ goto end;
+ }
+
+ stream_fd->suspendable = true;
+ stream_fd->u.handle = handle;
+end:
+ return stream_fd;
+}
+
+static struct stream_fd *stream_fd_unsuspendable_create(int fd)
+{
+ struct stream_fd *stream_fd = _stream_fd_alloc();
+
+ if (!stream_fd) {
+ goto end;
+ }
+
+ stream_fd->suspendable = false;
+ stream_fd->u.fd = fd;
+end:
+ return stream_fd;
+}
+
+static int open_file(void *data, int *out_fd)
+{
+ int ret;
+ const char *path = data;
+
+ ret = open(path, O_RDONLY);
+ if (ret < 0) {
+ goto end;
+ }
+ *out_fd = ret;
+ ret = 0;
+end:
+ return ret;
+}
+
+/*
+ * Stream files are opened (read-only) on the live end of the relayd.
+ * In live mode, it is expected that a client is able to consume a
+ * complete file even if it is replaced (in file rotation mode).
+ *
+ * Thus, it is not possible to open those files as suspendable file
+ * handles. This means that live clients can keep a large number of
+ * open file descriptors. As a work-around, we could create hard links
+ * to the files to make the files suspendable. The original file would be
+ * replaced, but the viewer's hard-link would ensure that the inode is
+ * still available for restoration.
+ *
+ * The main roadblock to this approach is validating that the trace
+ * directory resides in a filesystem that supports hard-links. Otherwise,
+ * a cooperative mechanism could allow the viewer end to mark a file as
+ * being in use and it could be renamed rather than unlinked by the
+ * receiving end.
+ */
+struct stream_fd *stream_fd_open(const char *path)
+{
+ int ret, fd;
+ struct stream_fd *stream_fd = NULL;
+
+ ret = fd_tracker_open_unsuspendable_fd(the_fd_tracker, &fd,
+ (const char **) &path, 1,
+ open_file, (void *) path);
+ if (ret) {
+ goto end;
+ }
+
+ stream_fd = stream_fd_unsuspendable_create(fd);
+ if (!stream_fd) {
+ (void) fd_tracker_close_unsuspendable_fd(the_fd_tracker, &fd, 1,
+ fd_tracker_util_close_fd, NULL);
+ }
+end:
+ return stream_fd;
+}
+
+static
+struct fs_handle *create_fs_handle(const char *path)
+{
+ struct fs_handle *handle;
+ /*
+ * With the session rotation feature on the relay, we might need to seek
+ * and truncate a tracefile, so we need read and write access.
+ */
+ int flags = O_RDWR | O_CREAT | O_TRUNC;
+ /* Open with 660 mode */
+ mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
+
+ handle = fd_tracker_open_fs_handle(the_fd_tracker, path, flags, &mode);
+ if (!handle) {
+ ERR("Failed to open fs handle to %s", path);
+ }
+
+ return handle;
+}
+
+/*
+ * Stream file are created by on the consumerd/data-reception end. Those
+ * stream fds can be suspended as there is no expectation that the files
+ * will be unlinked and then need to be appended-to.
+ *
+ * Hence, the file descriptors are created as suspendable to allow the
+ * fd-tracker to reduce the number of active fds..
+ */
+struct stream_fd *stream_fd_create(const char *path_name, const char *file_name,
+ uint64_t size, uint64_t count, const char *suffix)
+{
+ struct stream_fd *stream_fd = NULL;
+ struct fs_handle *handle;
+ int ret;
+ char path[PATH_MAX];
+
+ ret = utils_stream_file_name(path, path_name, file_name,
+ size, count, suffix);
+ if (ret < 0) {
+ goto end;
+ }
+
+ handle = create_fs_handle(path);
+ if (!handle) {
+ goto end;
+ }
+
+ stream_fd = stream_fd_suspendable_create(handle);
+ if (!stream_fd) {
+ (void) fs_handle_close(handle);
+ }
+
+end:
+ return stream_fd;
+}
+
+int stream_fd_rotate(struct stream_fd *stream_fd, const char *path_name,
+ const char *file_name, uint64_t size,
+ uint64_t count, uint64_t *new_count)
+{
+ int ret;
+ bool should_unlink;
+ char path[PATH_MAX];
+
+ assert(stream_fd);
+ assert(stream_fd->suspendable);
+
+ utils_stream_file_rotation_get_new_count(count, new_count,
+ &should_unlink);
+
+ ret = utils_stream_file_name(path, path_name, file_name,
+ size, count, NULL);
+ if (ret < 0) {
+ goto error;
+ }
+
+ ret = fs_handle_close(stream_fd->u.handle);
+ stream_fd->u.handle = NULL;
+ if (ret < 0) {
+ PERROR("Closing stream tracefile handle");
+ goto error;
+ }
+
+ if (should_unlink) {
+ unlink(path);
+ if (ret < 0 && errno != ENOENT) {
+ goto error;
+ }
+ }
+
+ ret = utils_stream_file_name(path, path_name, file_name,
+ size, new_count ? *new_count : 0, NULL);
+ if (ret < 0) {
+ goto error;
+ }
+
+ stream_fd->u.handle = create_fs_handle(path);
+ if (!stream_fd->u.handle) {
+ ret = -1;
+ goto error;
+ }
+
+ ret = 0;
+
+error:
+ return ret;
+}
+
void stream_fd_get(struct stream_fd *sf)
{
urcu_ref_get(&sf->ref);
struct stream_fd *sf = caa_container_of(ref, struct stream_fd, ref);
int ret;
- ret = close(sf->fd);
+ if (sf->suspendable) {
+ ret = fs_handle_close(sf->u.handle);
+ } else {
+ ret = fd_tracker_close_unsuspendable_fd(the_fd_tracker, &sf->u.fd,
+ 1, fd_tracker_util_close_fd, NULL);
+ }
if (ret) {
- PERROR("Error closing stream FD %d", sf->fd);
+ PERROR("Error closing stream handle");
}
free(sf);
}
{
urcu_ref_put(&sf->ref, stream_fd_release);
}
+
+int stream_fd_get_fd(struct stream_fd *sf)
+{
+ return sf->suspendable ? fs_handle_get_fd(sf->u.handle) : sf->u.fd;
+}
+
+void stream_fd_put_fd(struct stream_fd *sf)
+{
+ if (sf->suspendable) {
+ fs_handle_put_fd(sf->u.handle);
+ }
+}
/*
* Copyright (C) 2015 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ * 2018 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
-#include <urcu/ref.h>
+#include <stdint.h>
-struct stream_fd {
- int fd;
- struct urcu_ref ref;
-};
+struct stream_fd;
-struct stream_fd *stream_fd_create(int fd);
+struct stream_fd *stream_fd_open(const char *path);
+struct stream_fd *stream_fd_create(const char *path_name, const char *file_name,
+ uint64_t size, uint64_t count, const char *suffix);
+int stream_fd_rotate(struct stream_fd *sf, const char *path_name,
+ const char *file_name, uint64_t size,
+ uint64_t count, uint64_t *new_count);
void stream_fd_get(struct stream_fd *sf);
void stream_fd_put(struct stream_fd *sf);
+int stream_fd_get_fd(struct stream_fd *sf);
+void stream_fd_put_fd(struct stream_fd *sf);
#endif /* _STREAM_FD_H */
* No need to use run_as API here because whatever we receive,
* the relayd uses its own credentials for the stream files.
*/
- ret = utils_create_stream_file(stream->path_name, stream->channel_name,
- stream->tracefile_size, 0, -1, -1, NULL);
- if (ret < 0) {
- ERR("Create output file");
- goto end;
- }
- stream->stream_fd = stream_fd_create(ret);
+ stream->stream_fd = stream_fd_create(stream->path_name,
+ stream->channel_name, stream->tracefile_size, 0, NULL);
if (!stream->stream_fd) {
if (close(ret)) {
PERROR("Error closing file %d", ret);
pthread_mutex_lock(&handle->lock);
fd_tracker_untrack(handle->tracker, handle);
if (handle->fd >= 0) {
+ assert(!handle->in_use);
/*
* The return value of close() is not propagated as there
* isn't much the user can do about it.
*
* Return 0 on success or else a negative value.
*/
-static int utils_stream_file_name(char *path,
+LTTNG_HIDDEN
+int utils_stream_file_name(char *path,
const char *path_name, const char *file_name,
uint64_t size, uint64_t count,
const char *suffix)
return ret;
}
+LTTNG_HIDDEN
+void utils_stream_file_rotation_get_new_count(uint64_t count,
+ uint64_t *new_count, bool *should_unlink)
+{
+ if (count > 0) {
+ /*
+ * In tracefile rotation, for the relay daemon we need
+ * to unlink the old file if present, because it may
+ * still be open in reading by the live thread, and we
+ * need to ensure that we do not overwrite the content
+ * between get_index and get_packet. Since we have no
+ * way to verify integrity of the data content compared
+ * to the associated index, we need to ensure the reader
+ * has exclusive access to the file content, and that
+ * the open of the data file is performed in get_index.
+ * Unlinking the old file rather than overwriting it
+ * achieves this.
+ */
+ if (new_count) {
+ *new_count = (*new_count + 1) % count;
+ }
+ *should_unlink = true;
+ } else {
+ if (new_count) {
+ (*new_count)++;
+ }
+ *should_unlink = false;
+ }
+}
+
/*
* Change the output tracefile according to the given size and count The
* new_count pointer is set during this operation.
int *stream_fd)
{
int ret;
+ bool should_unlink;
assert(stream_fd);
+ utils_stream_file_rotation_get_new_count(count, new_count,
+ &should_unlink);
+
ret = close(out_fd);
if (ret < 0) {
PERROR("Closing tracefile");
}
*stream_fd = -1;
- if (count > 0) {
- /*
- * In tracefile rotation, for the relay daemon we need
- * to unlink the old file if present, because it may
- * still be open in reading by the live thread, and we
- * need to ensure that we do not overwrite the content
- * between get_index and get_packet. Since we have no
- * way to verify integrity of the data content compared
- * to the associated index, we need to ensure the reader
- * has exclusive access to the file content, and that
- * the open of the data file is performed in get_index.
- * Unlinking the old file rather than overwriting it
- * achieves this.
- */
- if (new_count) {
- *new_count = (*new_count + 1) % count;
- }
+ if (should_unlink) {
ret = utils_unlink_stream_file(path_name, file_name, size,
new_count ? *new_count : 0, uid, gid, 0);
if (ret < 0 && errno != ENOENT) {
goto error;
}
- } else {
- if (new_count) {
- (*new_count)++;
- }
}
ret = utils_create_stream_file(path_name, file_name, size,
#include <unistd.h>
#include <stdint.h>
#include <getopt.h>
+#include <stdbool.h>
#define KIBI_LOG2 10
#define MEBI_LOG2 20
int utils_create_pid_file(pid_t pid, const char *filepath);
int utils_mkdir(const char *path, mode_t mode, int uid, int gid);
int utils_mkdir_recursive(const char *path, mode_t mode, int uid, int gid);
+int utils_stream_file_name(char *path, const char *path_name,
+ const char *file_name, uint64_t size, uint64_t count,
+ const char *suffix);
int utils_create_stream_file(const char *path_name, char *file_name, uint64_t size,
uint64_t count, int uid, int gid, char *suffix);
int utils_unlink_stream_file(const char *path_name, char *file_name, uint64_t size,
uint64_t count, int uid, int gid, char *suffix);
+void utils_stream_file_rotation_get_new_count(uint64_t count,
+ uint64_t *new_count, bool *should_unlink);
int utils_rotate_stream_file(char *path_name, char *file_name, uint64_t size,
uint64_t count, int uid, int gid, int out_fd, uint64_t *new_count,
int *stream_fd);