X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fkernel-consumer%2Fkernel-consumer.c;h=8ed279f44e16db7e73a14d8947c9e87b143f6946;hp=3c9687306dc2aa2e904ef35f9954bdcc37e1d436;hb=00e2e675d54dc726a7c8f8887c889cc8ef022003;hpb=b8aa16822f579a6e15b41d2761801a0a65d5f2a5 diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 3c9687306..8ed279f44 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -31,7 +31,9 @@ #include #include #include +#include #include +#include #include "kernel-consumer.h" @@ -52,7 +54,18 @@ ssize_t lttng_kconsumer_on_read_subbuffer_mmap( ssize_t ret = 0, written = 0; off_t orig_offset = stream->out_fd_offset; int fd = stream->wait_fd; + /* Default is on the disk */ int outfd = stream->out_fd; + uint64_t metadata_id; + struct consumer_relayd_sock_pair *relayd = NULL; + + /* Flag that the current stream if set for network streaming. */ + if (stream->net_seq_idx != -1) { + relayd = consumer_find_relayd(stream->net_seq_idx); + if (relayd == NULL) { + goto end; + } + } /* get the offset inside the fd to mmap */ ret = kernctl_get_mmap_read_offset(fd, &mmap_offset); @@ -63,6 +76,49 @@ ssize_t lttng_kconsumer_on_read_subbuffer_mmap( goto end; } + /* RCU lock for the relayd pointer */ + rcu_read_lock(); + + /* Handle stream on the relayd if the output is on the network */ + if (relayd) { + /* + * Lock the control socket for the complete duration of the function + * since from this point on we will use the socket. + */ + if (stream->metadata_flag) { + /* Metadata requires the control socket. */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + } + + ret = consumer_handle_stream_before_relayd(stream, len); + if (ret >= 0) { + /* Use the returned socket. */ + outfd = ret; + + /* Write metadata stream id before payload */ + if (stream->metadata_flag) { + metadata_id = htobe64(stream->relayd_stream_id); + do { + ret = write(outfd, (void *) &metadata_id, + sizeof(stream->relayd_stream_id)); + if (ret < 0) { + PERROR("write metadata stream id"); + written = ret; + goto end; + } + } while (errno == EINTR); + DBG("Metadata stream id %zu written before data", + stream->relayd_stream_id); + /* + * We do this so the return value can match the len passed as + * argument to this function. + */ + written -= sizeof(stream->relayd_stream_id); + } + } + /* Else, use the default set before which is the filesystem. */ + } + while (len > 0) { ret = write(outfd, stream->mmap_base + mmap_offset, len); if (ret < 0) { @@ -84,14 +140,26 @@ ssize_t lttng_kconsumer_on_read_subbuffer_mmap( len -= ret; mmap_offset += ret; } - /* This won't block, but will start writeout asynchronously */ - lttng_sync_file_range(outfd, stream->out_fd_offset, ret, - SYNC_FILE_RANGE_WRITE); - stream->out_fd_offset += ret; + + /* This call is useless on a socket so better save a syscall. */ + if (!relayd) { + /* This won't block, but will start writeout asynchronously */ + lttng_sync_file_range(outfd, stream->out_fd_offset, ret, + SYNC_FILE_RANGE_WRITE); + stream->out_fd_offset += ret; + } written += ret; } lttng_consumer_sync_trace_file(stream, orig_offset); + end: + /* Unlock only if ctrl socket used */ + if (relayd && stream->metadata_flag) { + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + } + + rcu_read_unlock(); + return written; } @@ -104,54 +172,125 @@ ssize_t lttng_kconsumer_on_read_subbuffer_splice( struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *stream, unsigned long len) { - ssize_t ret = 0, written = 0; + ssize_t ret = 0, written = 0, ret_splice = 0; loff_t offset = 0; off_t orig_offset = stream->out_fd_offset; int fd = stream->wait_fd; + /* Default is on the disk */ int outfd = stream->out_fd; + uint64_t metadata_id; + struct consumer_relayd_sock_pair *relayd = NULL; + + /* Flag that the current stream if set for network streaming. */ + if (stream->net_seq_idx != -1) { + relayd = consumer_find_relayd(stream->net_seq_idx); + if (relayd == NULL) { + goto end; + } + } + + /* RCU lock for the relayd pointer */ + rcu_read_lock(); + + /* Write metadata stream id before payload */ + if (stream->metadata_flag && relayd) { + /* + * Lock the control socket for the complete duration of the function + * since from this point on we will use the socket. + */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + + do { + metadata_id = htobe64(stream->relayd_stream_id); + ret = write(ctx->consumer_thread_pipe[1], + (void *) &metadata_id, + sizeof(stream->relayd_stream_id)); + if (ret < 0) { + PERROR("write metadata stream id"); + written = ret; + goto end; + } + } while (errno == EINTR); + DBG("Metadata stream id %zu written before data", + stream->relayd_stream_id); + } while (len > 0) { - DBG("splice chan to pipe offset %lu (fd : %d)", - (unsigned long)offset, fd); - ret = splice(fd, &offset, ctx->consumer_thread_pipe[1], NULL, len, + DBG("splice chan to pipe offset %lu of len %lu (fd : %d)", + (unsigned long)offset, len, fd); + ret_splice = splice(fd, &offset, ctx->consumer_thread_pipe[1], NULL, len, SPLICE_F_MOVE | SPLICE_F_MORE); - DBG("splice chan to pipe ret %zd", ret); - if (ret < 0) { + DBG("splice chan to pipe, ret %zd", ret_splice); + if (ret_splice < 0) { perror("Error in relay splice"); if (written == 0) { - written = ret; + written = ret_splice; } ret = errno; goto splice_error; } - ret = splice(ctx->consumer_thread_pipe[0], NULL, outfd, NULL, ret, - SPLICE_F_MOVE | SPLICE_F_MORE); - DBG("splice pipe to file %zd", ret); - if (ret < 0) { + /* Handle stream on the relayd if the output is on the network */ + if (relayd) { + if (stream->metadata_flag) { + /* Update counter to fit the spliced data */ + ret_splice += sizeof(stream->relayd_stream_id); + len += sizeof(stream->relayd_stream_id); + /* + * We do this so the return value can match the len passed as + * argument to this function. + */ + written -= sizeof(stream->relayd_stream_id); + } + + ret = consumer_handle_stream_before_relayd(stream, ret_splice); + if (ret >= 0) { + /* Use the returned socket. */ + outfd = ret; + } else { + if (outfd == -1) { + ERR("Remote relayd disconnected. Stopping"); + goto end; + } + } + } + + DBG3("Kernel consumer splice data in %d to out %d", + ctx->consumer_thread_pipe[0], outfd); + ret_splice = splice(ctx->consumer_thread_pipe[0], NULL, outfd, NULL, + ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE); + DBG("splice pipe to file, ret %zd", ret_splice); + if (ret_splice < 0) { perror("Error in file splice"); if (written == 0) { - written = ret; + written = ret_splice; } ret = errno; goto splice_error; } - if (ret > len) { + if (ret_splice > len) { errno = EINVAL; - perror("Wrote more data than requested"); - written += ret; + PERROR("Wrote more data than requested %zd (len: %lu)", + ret_splice, len); + written += ret_splice; ret = errno; goto splice_error; } - len -= ret; - /* This won't block, but will start writeout asynchronously */ - lttng_sync_file_range(outfd, stream->out_fd_offset, ret, - SYNC_FILE_RANGE_WRITE); - stream->out_fd_offset += ret; - written += ret; + len -= ret_splice; + + /* This call is useless on a socket so better save a syscall. */ + if (!relayd) { + /* This won't block, but will start writeout asynchronously */ + lttng_sync_file_range(outfd, stream->out_fd_offset, ret_splice, + SYNC_FILE_RANGE_WRITE); + stream->out_fd_offset += ret_splice; + } + written += ret_splice; } lttng_consumer_sync_trace_file(stream, orig_offset); + ret = ret_splice; + goto end; splice_error: @@ -172,6 +311,12 @@ splice_error: } end: + if (relayd && stream->metadata_flag) { + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + } + + rcu_read_unlock(); + return written; } @@ -233,6 +378,85 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } switch (msg.cmd_type) { + case LTTNG_CONSUMER_ADD_RELAYD_SOCKET: + { + int fd; + struct consumer_relayd_sock_pair *relayd; + + DBG("Consumer adding relayd socket"); + + /* Get relayd reference if exists. */ + relayd = consumer_find_relayd(msg.u.relayd_sock.net_index); + if (relayd == NULL) { + /* Not found. Allocate one. */ + relayd = consumer_allocate_relayd_sock_pair( + msg.u.relayd_sock.net_index); + if (relayd == NULL) { + lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); + goto end_nosignal; + } + } + + /* Poll on consumer socket. */ + if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { + return -EINTR; + } + + /* Get relayd socket from session daemon */ + ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1); + if (ret != sizeof(fd)) { + lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); + goto end_nosignal; + } + + /* Copy socket information and received FD */ + switch (msg.u.relayd_sock.type) { + case LTTNG_STREAM_CONTROL: + /* Copy received lttcomm socket */ + lttcomm_copy_sock(&relayd->control_sock, &msg.u.relayd_sock.sock); + + ret = lttcomm_create_sock(&relayd->control_sock); + if (ret < 0) { + goto end_nosignal; + } + + /* Close the created socket fd which is useless */ + close(relayd->control_sock.fd); + + /* Assign new file descriptor */ + relayd->control_sock.fd = fd; + break; + case LTTNG_STREAM_DATA: + /* Copy received lttcomm socket */ + lttcomm_copy_sock(&relayd->data_sock, &msg.u.relayd_sock.sock); + ret = lttcomm_create_sock(&relayd->data_sock); + if (ret < 0) { + goto end_nosignal; + } + + /* Close the created socket fd which is useless */ + close(relayd->data_sock.fd); + + /* Assign new file descriptor */ + relayd->data_sock.fd = fd; + break; + default: + ERR("Unknown relayd socket type"); + goto end_nosignal; + } + + DBG("Consumer %s socket created successfully with net idx %d (fd: %d)", + msg.u.relayd_sock.type == LTTNG_STREAM_CONTROL ? "control" : "data", + relayd->net_seq_idx, fd); + + /* + * Add relayd socket pair to consumer data hashtable. If object already + * exists or on error, the function gracefully returns. + */ + consumer_add_relayd(relayd); + + goto end_nosignal; + } case LTTNG_CONSUMER_ADD_CHANNEL: { struct lttng_consumer_channel *new_channel; @@ -260,21 +484,22 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } case LTTNG_CONSUMER_ADD_STREAM: { - struct lttng_consumer_stream *new_stream; int fd; + struct consumer_relayd_sock_pair *relayd = NULL; + struct lttng_consumer_stream *new_stream; /* block */ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { return -EINTR; } + + /* Get stream file descriptor from socket */ ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1); if (ret != sizeof(fd)) { lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); return ret; } - DBG("consumer_add_stream %s (%d)", msg.u.stream.path_name, - fd); new_stream = consumer_allocate_stream(msg.u.stream.channel_key, msg.u.stream.stream_key, fd, fd, @@ -283,11 +508,33 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.stream.output, msg.u.stream.path_name, msg.u.stream.uid, - msg.u.stream.gid); + msg.u.stream.gid, + msg.u.stream.net_index, + msg.u.stream.metadata_flag); if (new_stream == NULL) { lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); goto end; } + + /* The stream is not metadata. Get relayd reference if exists. */ + relayd = consumer_find_relayd(msg.u.stream.net_index); + if (relayd != NULL) { + /* Add stream on the relayd */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_add_stream(&relayd->control_sock, + msg.u.stream.name, msg.u.stream.path_name, + &new_stream->relayd_stream_id); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + if (ret < 0) { + goto end; + } + } else if (msg.u.stream.net_index != -1) { + ERR("Network sequence index %d unknown. Not adding stream.", + msg.u.stream.net_index); + free(new_stream); + goto end; + } + if (ctx->on_recv_stream != NULL) { ret = ctx->on_recv_stream(new_stream); if (ret == 0) { @@ -298,6 +545,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } else { consumer_add_stream(new_stream); } + + DBG("Kernel consumer_add_stream (%d)", fd); break; } case LTTNG_CONSUMER_UPDATE_STREAM: @@ -380,7 +629,8 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, * display the error but continue processing to try * to release the subbuffer */ - ERR("Error splicing to tracefile"); + ERR("Error splicing to tracefile (ret: %ld != len: %ld)", + ret, len); } break; @@ -428,7 +678,7 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream) int ret; /* Opening the tracefile in write mode */ - if (stream->path_name != NULL) { + if (strlen(stream->path_name) > 0 && stream->net_seq_idx == -1) { ret = run_as_open(stream->path_name, O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO,