X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=f6add2c2c88327d05f1dd1ce90b49241df74c1f8;hp=47c0d460ffb60cc612d9806325602c6eac459023;hb=00e2e675d54dc726a7c8f8887c889cc8ef022003;hpb=b8aa16822f579a6e15b41d2761801a0a65d5f2a5 diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 47c0d460f..f6add2c2c 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -31,6 +31,7 @@ #include #include +#include #include #include "ust-consumer.h" @@ -52,6 +53,16 @@ ssize_t lttng_ustconsumer_on_read_subbuffer_mmap( long ret = 0, written = 0; off_t orig_offset = stream->out_fd_offset; int outfd = stream->out_fd; + uint64_t metadata_id; + struct consumer_relayd_sock_pair *relayd = NULL; + + /* Flag that the current stream if set for network streaming. */ + if (stream->net_seq_idx != -1) { + relayd = consumer_find_relayd(stream->net_seq_idx); + if (relayd == NULL) { + goto end; + } + } /* get the offset inside the fd to mmap */ ret = ustctl_get_mmap_read_offset(stream->chan->handle, @@ -62,6 +73,37 @@ ssize_t lttng_ustconsumer_on_read_subbuffer_mmap( written = ret; goto end; } + + /* Handle stream on the relayd if the output is on the network */ + if (relayd) { + if (stream->metadata_flag) { + /* Only lock if metadata since we use the control socket. */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + } + + ret = consumer_handle_stream_before_relayd(stream, len); + if (ret >= 0) { + outfd = ret; + + /* Write metadata stream id before payload */ + if (stream->metadata_flag) { + metadata_id = htobe64(stream->relayd_stream_id); + do { + ret = write(outfd, (void *) &metadata_id, + sizeof(stream->relayd_stream_id)); + if (ret < 0) { + PERROR("write metadata stream id"); + written = ret; + goto end; + } + } while (errno == EINTR); + DBG("Metadata stream id %zu written before data", + stream->relayd_stream_id); + } + } + /* Else, use the default set before which is the filesystem. */ + } + while (len > 0) { ret = write(outfd, stream->mmap_base + mmap_offset, len); if (ret < 0) { @@ -76,21 +118,30 @@ ssize_t lttng_ustconsumer_on_read_subbuffer_mmap( goto end; } } else if (ret > len) { - PERROR("Error in file write"); + PERROR("ret %ld > len %lu", ret, len); written += ret; goto end; } else { len -= ret; mmap_offset += ret; } - /* This won't block, but will start writeout asynchronously */ - lttng_sync_file_range(outfd, stream->out_fd_offset, ret, - SYNC_FILE_RANGE_WRITE); - stream->out_fd_offset += ret; + DBG("UST mmap write() ret %ld (len %lu)", ret, len); + + /* This call is useless on a socket so better save a syscall. */ + if (!relayd) { + /* This won't block, but will start writeout asynchronously */ + lttng_sync_file_range(outfd, stream->out_fd_offset, ret, + SYNC_FILE_RANGE_WRITE); + stream->out_fd_offset += ret; + } written += ret; } lttng_consumer_sync_trace_file(stream, orig_offset); + end: + if (relayd && stream->metadata_flag) { + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + } return written; } @@ -163,6 +214,84 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } switch (msg.cmd_type) { + case LTTNG_CONSUMER_ADD_RELAYD_SOCKET: + { + int fd; + struct consumer_relayd_sock_pair *relayd; + + DBG("UST Consumer adding relayd socket"); + + /* Get relayd reference if exists. */ + relayd = consumer_find_relayd(msg.u.relayd_sock.net_index); + if (relayd == NULL) { + /* Not found. Allocate one. */ + relayd = consumer_allocate_relayd_sock_pair( + msg.u.relayd_sock.net_index); + if (relayd == NULL) { + lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); + goto end_nosignal; + } + } + + /* Poll on consumer socket. */ + if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { + return -EINTR; + } + + /* Get relayd socket from session daemon */ + ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1); + if (ret != sizeof(fd)) { + lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); + goto end_nosignal; + } + + /* Copy socket information and received FD */ + switch (msg.u.relayd_sock.type) { + case LTTNG_STREAM_CONTROL: + /* Copy received lttcomm socket */ + lttcomm_copy_sock(&relayd->control_sock, &msg.u.relayd_sock.sock); + ret = lttcomm_create_sock(&relayd->control_sock); + if (ret < 0) { + goto end_nosignal; + } + + /* Close the created socket fd which is useless */ + close(relayd->control_sock.fd); + + /* Assign new file descriptor */ + relayd->control_sock.fd = fd; + break; + case LTTNG_STREAM_DATA: + /* Copy received lttcomm socket */ + lttcomm_copy_sock(&relayd->data_sock, &msg.u.relayd_sock.sock); + ret = lttcomm_create_sock(&relayd->data_sock); + if (ret < 0) { + goto end_nosignal; + } + + /* Close the created socket fd which is useless */ + close(relayd->data_sock.fd); + + /* Assign new file descriptor */ + relayd->data_sock.fd = fd; + break; + default: + ERR("Unknown relayd socket type"); + goto end_nosignal; + } + + DBG("Consumer %s socket created successfully with net idx %d (fd: %d)", + msg.u.relayd_sock.type == LTTNG_STREAM_CONTROL ? "control" : "data", + relayd->net_seq_idx, fd); + + /* + * Add relayd socket pair to consumer data hashtable. If object already + * exists or on error, the function gracefully returns. + */ + consumer_add_relayd(relayd); + + goto end_nosignal; + } case LTTNG_CONSUMER_ADD_CHANNEL: { struct lttng_consumer_channel *new_channel; @@ -206,6 +335,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *new_stream; int fds[2]; size_t nb_fd = 2; + struct consumer_relayd_sock_pair *relayd = NULL; /* block */ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { @@ -217,8 +347,6 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, return ret; } - DBG("consumer_add_stream %s (%d,%d)", msg.u.stream.path_name, - fds[0], fds[1]); assert(msg.u.stream.output == LTTNG_EVENT_MMAP); new_stream = consumer_allocate_stream(msg.u.channel.channel_key, msg.u.stream.stream_key, @@ -228,11 +356,33 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.stream.output, msg.u.stream.path_name, msg.u.stream.uid, - msg.u.stream.gid); + msg.u.stream.gid, + msg.u.stream.net_index, + msg.u.stream.metadata_flag); if (new_stream == NULL) { lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); goto end; } + + /* The stream is not metadata. Get relayd reference if exists. */ + relayd = consumer_find_relayd(msg.u.stream.net_index); + if (relayd != NULL) { + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + /* Add stream on the relayd */ + ret = relayd_add_stream(&relayd->control_sock, + msg.u.stream.name, msg.u.stream.path_name, + &new_stream->relayd_stream_id); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + if (ret < 0) { + goto end; + } + } else if (msg.u.stream.net_index != -1) { + ERR("Network sequence index %d unknown. Not adding stream.", + msg.u.stream.net_index); + free(new_stream); + goto end; + } + if (ctx->on_recv_stream != NULL) { ret = ctx->on_recv_stream(new_stream); if (ret == 0) { @@ -243,6 +393,10 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } else { consumer_add_stream(new_stream); } + + DBG("UST consumer_add_stream %s (%d,%d) with relayd id %lu", + msg.u.stream.path_name, fds[0], fds[1], + new_stream->relayd_stream_id); break; } case LTTNG_CONSUMER_UPDATE_STREAM: @@ -412,7 +566,7 @@ int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream) int ret; /* Opening the tracefile in write mode */ - if (stream->path_name != NULL) { + if (stream->path_name != NULL && stream->net_seq_idx == -1) { ret = run_as_open(stream->path_name, O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO,