X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fcommon%2Fkernel-consumer%2Fkernel-consumer.c;h=00bb7f7212cac7f0ccf4f4a6c91ef80dbb4b58ef;hb=6f94560a050962daad560dac2823bd97b0b1bf98;hp=c1ba1405ea478931a9cd3dee4ed57399ec2c5cd7;hpb=87dc6a9c2c936cf4386043083412c695a914cb36;p=lttng-tools.git diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index c1ba1405e..00bb7f721 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -2,19 +2,18 @@ * Copyright (C) 2011 - Julien Desfossez * Mathieu Desnoyers * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; only version 2 - * of the License. + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License, version 2 only, + * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #define _GNU_SOURCE @@ -32,7 +31,9 @@ #include #include #include +#include #include +#include #include "kernel-consumer.h" @@ -45,45 +46,118 @@ extern volatile int consumer_quit; * * Returns the number of bytes written */ -int lttng_kconsumer_on_read_subbuffer_mmap( +ssize_t lttng_kconsumer_on_read_subbuffer_mmap( struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *stream, unsigned long len) { unsigned long mmap_offset; - long ret = 0; + ssize_t ret = 0, written = 0; off_t orig_offset = stream->out_fd_offset; int fd = stream->wait_fd; + /* Default is on the disk */ int outfd = stream->out_fd; + uint64_t metadata_id; + struct consumer_relayd_sock_pair *relayd = NULL; + + /* Flag that the current stream if set for network streaming. */ + if (stream->net_seq_idx != -1) { + relayd = consumer_find_relayd(stream->net_seq_idx); + if (relayd == NULL) { + goto end; + } + } /* get the offset inside the fd to mmap */ ret = kernctl_get_mmap_read_offset(fd, &mmap_offset); if (ret != 0) { errno = -ret; perror("kernctl_get_mmap_read_offset"); + written = ret; goto end; } + /* RCU lock for the relayd pointer */ + rcu_read_lock(); + + /* Handle stream on the relayd if the output is on the network */ + if (relayd) { + /* + * Lock the control socket for the complete duration of the function + * since from this point on we will use the socket. + */ + if (stream->metadata_flag) { + /* Metadata requires the control socket. */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + } + + ret = consumer_handle_stream_before_relayd(stream, len); + if (ret >= 0) { + /* Use the returned socket. */ + outfd = ret; + + /* Write metadata stream id before payload */ + if (stream->metadata_flag) { + metadata_id = htobe64(stream->relayd_stream_id); + do { + ret = write(outfd, (void *) &metadata_id, + sizeof(stream->relayd_stream_id)); + } while (ret < 0 && errno == EINTR); + if (ret < 0) { + PERROR("write metadata stream id"); + written = ret; + goto end; + } + DBG("Metadata stream id %zu written before data", + stream->relayd_stream_id); + /* + * We do this so the return value can match the len passed as + * argument to this function. + */ + written -= sizeof(stream->relayd_stream_id); + } + } + /* Else, use the default set before which is the filesystem. */ + } + while (len > 0) { - ret = write(outfd, stream->mmap_base + mmap_offset, len); - if (ret >= len) { - len = 0; - } else if (ret < 0) { - errno = -ret; + do { + ret = write(outfd, stream->mmap_base + mmap_offset, len); + } while (ret < 0 && errno == EINTR); + if (ret < 0) { + perror("Error in file write"); + if (written == 0) { + written = ret; + } + goto end; + } else if (ret > len) { perror("Error in file write"); + written += ret; goto end; + } else { + len -= ret; + mmap_offset += ret; } - /* This won't block, but will start writeout asynchronously */ - lttng_sync_file_range(outfd, stream->out_fd_offset, ret, - SYNC_FILE_RANGE_WRITE); - stream->out_fd_offset += ret; - } + /* This call is useless on a socket so better save a syscall. */ + if (!relayd) { + /* This won't block, but will start writeout asynchronously */ + lttng_sync_file_range(outfd, stream->out_fd_offset, ret, + SYNC_FILE_RANGE_WRITE); + stream->out_fd_offset += ret; + } + written += ret; + } lttng_consumer_sync_trace_file(stream, orig_offset); - goto end; - end: - return ret; + /* Unlock only if ctrl socket used */ + if (relayd && stream->metadata_flag) { + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + } + + rcu_read_unlock(); + + return written; } /* @@ -91,49 +165,134 @@ end: * * Returns the number of bytes spliced. */ -int lttng_kconsumer_on_read_subbuffer_splice( +ssize_t lttng_kconsumer_on_read_subbuffer_splice( struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *stream, unsigned long len) { - long ret = 0; + ssize_t ret = 0, written = 0, ret_splice = 0; loff_t offset = 0; off_t orig_offset = stream->out_fd_offset; int fd = stream->wait_fd; + /* Default is on the disk */ int outfd = stream->out_fd; + uint64_t metadata_id; + struct consumer_relayd_sock_pair *relayd = NULL; + + /* Flag that the current stream if set for network streaming. */ + if (stream->net_seq_idx != -1) { + relayd = consumer_find_relayd(stream->net_seq_idx); + if (relayd == NULL) { + goto end; + } + } + + /* RCU lock for the relayd pointer */ + rcu_read_lock(); + + /* Write metadata stream id before payload */ + if (stream->metadata_flag && relayd) { + /* + * Lock the control socket for the complete duration of the function + * since from this point on we will use the socket. + */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + + metadata_id = htobe64(stream->relayd_stream_id); + do { + ret = write(ctx->consumer_thread_pipe[1], + (void *) &metadata_id, + sizeof(stream->relayd_stream_id)); + } while (ret < 0 && errno == EINTR); + if (ret < 0) { + PERROR("write metadata stream id"); + written = ret; + goto end; + } + DBG("Metadata stream id %zu written before data", + stream->relayd_stream_id); + } while (len > 0) { - DBG("splice chan to pipe offset %lu (fd : %d)", - (unsigned long)offset, fd); - ret = splice(fd, &offset, ctx->consumer_thread_pipe[1], NULL, len, + DBG("splice chan to pipe offset %lu of len %lu (fd : %d)", + (unsigned long)offset, len, fd); + ret_splice = splice(fd, &offset, ctx->consumer_thread_pipe[1], NULL, len, SPLICE_F_MOVE | SPLICE_F_MORE); - DBG("splice chan to pipe ret %ld", ret); - if (ret < 0) { - errno = -ret; + DBG("splice chan to pipe, ret %zd", ret_splice); + if (ret_splice < 0) { perror("Error in relay splice"); + if (written == 0) { + written = ret_splice; + } + ret = errno; goto splice_error; } - ret = splice(ctx->consumer_thread_pipe[0], NULL, outfd, NULL, ret, - SPLICE_F_MOVE | SPLICE_F_MORE); - DBG("splice pipe to file %ld", ret); - if (ret < 0) { - errno = -ret; + /* Handle stream on the relayd if the output is on the network */ + if (relayd) { + if (stream->metadata_flag) { + /* Update counter to fit the spliced data */ + ret_splice += sizeof(stream->relayd_stream_id); + len += sizeof(stream->relayd_stream_id); + /* + * We do this so the return value can match the len passed as + * argument to this function. + */ + written -= sizeof(stream->relayd_stream_id); + } + + ret = consumer_handle_stream_before_relayd(stream, ret_splice); + if (ret >= 0) { + /* Use the returned socket. */ + outfd = ret; + } else { + if (outfd == -1) { + ERR("Remote relayd disconnected. Stopping"); + goto end; + } + } + } + + DBG3("Kernel consumer splice data in %d to out %d", + ctx->consumer_thread_pipe[0], outfd); + ret_splice = splice(ctx->consumer_thread_pipe[0], NULL, outfd, NULL, + ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE); + DBG("splice pipe to file, ret %zd", ret_splice); + if (ret_splice < 0) { perror("Error in file splice"); + if (written == 0) { + written = ret_splice; + } + ret = errno; goto splice_error; } - len -= ret; - /* This won't block, but will start writeout asynchronously */ - lttng_sync_file_range(outfd, stream->out_fd_offset, ret, - SYNC_FILE_RANGE_WRITE); - stream->out_fd_offset += ret; + if (ret_splice > len) { + errno = EINVAL; + PERROR("Wrote more data than requested %zd (len: %lu)", + ret_splice, len); + written += ret_splice; + ret = errno; + goto splice_error; + } + len -= ret_splice; + + /* This call is useless on a socket so better save a syscall. */ + if (!relayd) { + /* This won't block, but will start writeout asynchronously */ + lttng_sync_file_range(outfd, stream->out_fd_offset, ret_splice, + SYNC_FILE_RANGE_WRITE); + stream->out_fd_offset += ret_splice; + } + written += ret_splice; } lttng_consumer_sync_trace_file(stream, orig_offset); + ret = ret_splice; + goto end; splice_error: /* send the appropriate error description to sessiond */ - switch(ret) { + switch (ret) { case EBADF: lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_EBADF); break; @@ -149,7 +308,13 @@ splice_error: } end: - return ret; + if (relayd && stream->metadata_flag) { + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + } + + rcu_read_unlock(); + + return written; } /* @@ -210,6 +375,85 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } switch (msg.cmd_type) { + case LTTNG_CONSUMER_ADD_RELAYD_SOCKET: + { + int fd; + struct consumer_relayd_sock_pair *relayd; + + DBG("Consumer adding relayd socket"); + + /* Get relayd reference if exists. */ + relayd = consumer_find_relayd(msg.u.relayd_sock.net_index); + if (relayd == NULL) { + /* Not found. Allocate one. */ + relayd = consumer_allocate_relayd_sock_pair( + msg.u.relayd_sock.net_index); + if (relayd == NULL) { + lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); + goto end_nosignal; + } + } + + /* Poll on consumer socket. */ + if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { + return -EINTR; + } + + /* Get relayd socket from session daemon */ + ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1); + if (ret != sizeof(fd)) { + lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); + goto end_nosignal; + } + + /* Copy socket information and received FD */ + switch (msg.u.relayd_sock.type) { + case LTTNG_STREAM_CONTROL: + /* Copy received lttcomm socket */ + lttcomm_copy_sock(&relayd->control_sock, &msg.u.relayd_sock.sock); + + ret = lttcomm_create_sock(&relayd->control_sock); + if (ret < 0) { + goto end_nosignal; + } + + /* Close the created socket fd which is useless */ + close(relayd->control_sock.fd); + + /* Assign new file descriptor */ + relayd->control_sock.fd = fd; + break; + case LTTNG_STREAM_DATA: + /* Copy received lttcomm socket */ + lttcomm_copy_sock(&relayd->data_sock, &msg.u.relayd_sock.sock); + ret = lttcomm_create_sock(&relayd->data_sock); + if (ret < 0) { + goto end_nosignal; + } + + /* Close the created socket fd which is useless */ + close(relayd->data_sock.fd); + + /* Assign new file descriptor */ + relayd->data_sock.fd = fd; + break; + default: + ERR("Unknown relayd socket type"); + goto end_nosignal; + } + + DBG("Consumer %s socket created successfully with net idx %d (fd: %d)", + msg.u.relayd_sock.type == LTTNG_STREAM_CONTROL ? "control" : "data", + relayd->net_seq_idx, fd); + + /* + * Add relayd socket pair to consumer data hashtable. If object already + * exists or on error, the function gracefully returns. + */ + consumer_add_relayd(relayd); + + goto end_nosignal; + } case LTTNG_CONSUMER_ADD_CHANNEL: { struct lttng_consumer_channel *new_channel; @@ -237,21 +481,22 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } case LTTNG_CONSUMER_ADD_STREAM: { - struct lttng_consumer_stream *new_stream; int fd; + struct consumer_relayd_sock_pair *relayd = NULL; + struct lttng_consumer_stream *new_stream; /* block */ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { return -EINTR; } + + /* Get stream file descriptor from socket */ ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1); if (ret != sizeof(fd)) { lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); return ret; } - DBG("consumer_add_stream %s (%d)", msg.u.stream.path_name, - fd); new_stream = consumer_allocate_stream(msg.u.stream.channel_key, msg.u.stream.stream_key, fd, fd, @@ -260,11 +505,33 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.stream.output, msg.u.stream.path_name, msg.u.stream.uid, - msg.u.stream.gid); + msg.u.stream.gid, + msg.u.stream.net_index, + msg.u.stream.metadata_flag); if (new_stream == NULL) { lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); goto end; } + + /* The stream is not metadata. Get relayd reference if exists. */ + relayd = consumer_find_relayd(msg.u.stream.net_index); + if (relayd != NULL) { + /* Add stream on the relayd */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_add_stream(&relayd->control_sock, + msg.u.stream.name, msg.u.stream.path_name, + &new_stream->relayd_stream_id); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + if (ret < 0) { + goto end; + } + } else if (msg.u.stream.net_index != -1) { + ERR("Network sequence index %d unknown. Not adding stream.", + msg.u.stream.net_index); + free(new_stream); + goto end; + } + if (ctx->on_recv_stream != NULL) { ret = ctx->on_recv_stream(new_stream); if (ret == 0) { @@ -275,6 +542,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } else { consumer_add_stream(new_stream); } + + DBG("Kernel consumer_add_stream (%d)", fd); break; } case LTTNG_CONSUMER_UPDATE_STREAM: @@ -296,11 +565,20 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, break; } end: - /* signal the poll thread */ - ret = write(ctx->consumer_poll_pipe[1], "4", 1); - if (ret < 0) { - perror("write consumer poll"); - } + /* + * Wake-up the other end by writing a null byte in the pipe + * (non-blocking). Important note: Because writing into the + * pipe is non-blocking (and therefore we allow dropping wakeup + * data, as long as there is wakeup data present in the pipe + * buffer to wake up the other end), the other end should + * perform the following sequence for waiting: + * 1) empty the pipe (reads). + * 2) perform update operation. + * 3) wait on the pipe (poll). + */ + do { + ret = write(ctx->consumer_poll_pipe[1], "", 1); + } while (ret < 0 && errno == EINTR); end_nosignal: return 0; } @@ -308,12 +586,12 @@ end_nosignal: /* * Consume data on a file descriptor and write it on a trace file. */ -int lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, +ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx) { unsigned long len; int err; - long ret = 0; + ssize_t ret = 0; int infd = stream->wait_fd; DBG("In read_subbuffer (infd : %d)", infd); @@ -343,13 +621,15 @@ int lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, /* splice the subbuffer to the tracefile */ ret = lttng_consumer_on_read_subbuffer_splice(ctx, stream, len); - if (ret < 0) { + if (ret != len) { /* * display the error but continue processing to try * to release the subbuffer */ - ERR("Error splicing to tracefile"); + ERR("Error splicing to tracefile (ret: %ld != len: %ld)", + ret, len); } + break; case LTTNG_EVENT_MMAP: /* read the used subbuffer size */ @@ -361,7 +641,7 @@ int lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, } /* write the subbuffer to the tracefile */ ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len); - if (ret < 0) { + if (ret != len) { /* * display the error but continue processing to try * to release the subbuffer @@ -395,7 +675,7 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream) int ret; /* Opening the tracefile in write mode */ - if (stream->path_name != NULL) { + if (strlen(stream->path_name) > 0 && stream->net_seq_idx == -1) { ret = run_as_open(stream->path_name, O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO,