X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=3024af4a75157b92a46a67014800b9eab66c8e64;hp=1cd39ef34b7e957bf9cfa95ac93e7fb40374fb9c;hb=77c7c900d190f7fb4f99a456c767f069da7e72b8;hpb=6f94560a050962daad560dac2823bd97b0b1bf98 diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 1cd39ef34..3024af4a7 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -18,6 +18,7 @@ #define _GNU_SOURCE #include +#include #include #include #include @@ -26,8 +27,8 @@ #include #include #include +#include #include -#include #include #include @@ -41,118 +42,14 @@ extern int consumer_poll_timeout; extern volatile int consumer_quit; /* - * Mmap the ring buffer, read it and write the data to the tracefile. - * - * Returns the number of bytes written, else negative value on error. - */ -ssize_t lttng_ustconsumer_on_read_subbuffer_mmap( - struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *stream, unsigned long len) -{ - unsigned long mmap_offset; - long ret = 0, written = 0; - off_t orig_offset = stream->out_fd_offset; - int outfd = stream->out_fd; - uint64_t metadata_id; - struct consumer_relayd_sock_pair *relayd = NULL; - - /* Flag that the current stream if set for network streaming. */ - if (stream->net_seq_idx != -1) { - relayd = consumer_find_relayd(stream->net_seq_idx); - if (relayd == NULL) { - goto end; - } - } - - /* get the offset inside the fd to mmap */ - ret = ustctl_get_mmap_read_offset(stream->chan->handle, - stream->buf, &mmap_offset); - if (ret != 0) { - errno = -ret; - PERROR("ustctl_get_mmap_read_offset"); - written = ret; - goto end; - } - - /* Handle stream on the relayd if the output is on the network */ - if (relayd) { - if (stream->metadata_flag) { - /* Only lock if metadata since we use the control socket. */ - pthread_mutex_lock(&relayd->ctrl_sock_mutex); - } - - ret = consumer_handle_stream_before_relayd(stream, len); - if (ret >= 0) { - outfd = ret; - - /* Write metadata stream id before payload */ - if (stream->metadata_flag) { - metadata_id = htobe64(stream->relayd_stream_id); - do { - ret = write(outfd, (void *) &metadata_id, - sizeof(stream->relayd_stream_id)); - } while (ret < 0 && errno == EINTR); - if (ret < 0) { - PERROR("write metadata stream id"); - written = ret; - goto end; - } - DBG("Metadata stream id %zu written before data", - stream->relayd_stream_id); - } - } - /* Else, use the default set before which is the filesystem. */ - } - - while (len > 0) { - do { - ret = write(outfd, stream->mmap_base + mmap_offset, len); - } while (ret < 0 && errno == EINTR); - if (ret < 0) { - PERROR("Error in file write"); - if (written == 0) { - written = ret; - } - goto end; - } else if (ret > len) { - PERROR("ret %ld > len %lu", ret, len); - written += ret; - goto end; - } else { - len -= ret; - mmap_offset += ret; - } - DBG("UST mmap write() ret %ld (len %lu)", ret, len); - - /* This call is useless on a socket so better save a syscall. */ - if (!relayd) { - /* This won't block, but will start writeout asynchronously */ - lttng_sync_file_range(outfd, stream->out_fd_offset, ret, - SYNC_FILE_RANGE_WRITE); - stream->out_fd_offset += ret; - } - written += ret; - } - lttng_consumer_sync_trace_file(stream, orig_offset); - -end: - if (relayd && stream->metadata_flag) { - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); - } - return written; -} - -/* - * Splice the data from the ring buffer to the tracefile. - * - * Returns the number of bytes spliced. + * Wrapper over the mmap() read offset from ust-ctl library. Since this can be + * compiled out, we isolate it in this library. */ -ssize_t lttng_ustconsumer_on_read_subbuffer_splice( - struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *stream, unsigned long len) +int lttng_ustctl_get_mmap_read_offset(struct lttng_ust_shm_handle *handle, + struct lttng_ust_lib_ring_buffer *buf, unsigned long *off) { - return -ENOSYS; -} + return ustctl_get_mmap_read_offset(handle, buf, off); +}; /* * Take a snapshot for a specific fd @@ -203,6 +100,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg)); if (ret != sizeof(msg)) { + DBG("Consumer received unexpected message size %zd (expects %zu)", + ret, sizeof(msg)); lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); return ret; } @@ -210,6 +109,9 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, return -ENOENT; } + /* relayd needs RCU read-side lock */ + rcu_read_lock(); + switch (msg.cmd_type) { case LTTNG_CONSUMER_ADD_RELAYD_SOCKET: { @@ -232,6 +134,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* Poll on consumer socket. */ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { + rcu_read_unlock(); return -EINTR; } @@ -295,13 +198,17 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, int fds[1]; size_t nb_fd = 1; + DBG("UST Consumer adding channel"); + /* block */ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { + rcu_read_unlock(); return -EINTR; } ret = lttcomm_recv_fds_unix_sock(sock, fds, nb_fd); if (ret != sizeof(fds)) { lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); + rcu_read_unlock(); return ret; } @@ -334,18 +241,26 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, size_t nb_fd = 2; struct consumer_relayd_sock_pair *relayd = NULL; + DBG("UST Consumer adding stream"); + /* block */ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { + rcu_read_unlock(); return -EINTR; } ret = lttcomm_recv_fds_unix_sock(sock, fds, nb_fd); if (ret != sizeof(fds)) { lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); + rcu_read_unlock(); return ret; } + DBG("consumer_add_stream chan %d stream %d", + msg.u.stream.channel_key, + msg.u.stream.stream_key); + assert(msg.u.stream.output == LTTNG_EVENT_MMAP); - new_stream = consumer_allocate_stream(msg.u.channel.channel_key, + new_stream = consumer_allocate_stream(msg.u.stream.channel_key, msg.u.stream.stream_key, fds[0], fds[1], msg.u.stream.state, @@ -358,7 +273,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.stream.metadata_flag); if (new_stream == NULL) { lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); - goto end; + goto end_nosignal; } /* The stream is not metadata. Get relayd reference if exists. */ @@ -371,13 +286,13 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, &new_stream->relayd_stream_id); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { - goto end; + goto end_nosignal; } } else if (msg.u.stream.net_index != -1) { ERR("Network sequence index %d unknown. Not adding stream.", msg.u.stream.net_index); free(new_stream); - goto end; + goto end_nosignal; } if (ctx->on_recv_stream != NULL) { @@ -385,19 +300,43 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, if (ret == 0) { consumer_add_stream(new_stream); } else if (ret < 0) { - goto end; + goto end_nosignal; } } else { consumer_add_stream(new_stream); } - DBG("UST consumer_add_stream %s (%d,%d) with relayd id %lu", + DBG("UST consumer_add_stream %s (%d,%d) with relayd id %" PRIu64, msg.u.stream.path_name, fds[0], fds[1], new_stream->relayd_stream_id); break; } + case LTTNG_CONSUMER_DESTROY_RELAYD: + { + struct consumer_relayd_sock_pair *relayd; + + DBG("UST consumer destroying relayd %" PRIu64, + msg.u.destroy_relayd.net_seq_idx); + + /* Get relayd reference if exists. */ + relayd = consumer_find_relayd(msg.u.destroy_relayd.net_seq_idx); + if (relayd == NULL) { + ERR("Unable to find relayd %" PRIu64, msg.u.destroy_relayd.net_seq_idx); + goto end_nosignal; + } + + /* Set destroy flag for this object */ + uatomic_set(&relayd->destroy_flag, 1); + + /* Destroy the relayd if refcount is 0 else set the destroy flag. */ + if (uatomic_read(&relayd->refcount) == 0) { + consumer_destroy_relayd(relayd); + } + goto end_nosignal; + } case LTTNG_CONSUMER_UPDATE_STREAM: { + rcu_read_unlock(); return -ENOSYS; #if 0 if (ctx->on_update_stream != NULL) { @@ -411,20 +350,20 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, consumer_change_stream_state(msg.u.stream.stream_key, msg.u.stream.state); } -#endif break; +#endif } default: break; } -end: + /* - * Wake-up the other end by writing a null byte in the pipe - * (non-blocking). Important note: Because writing into the - * pipe is non-blocking (and therefore we allow dropping wakeup - * data, as long as there is wakeup data present in the pipe - * buffer to wake up the other end), the other end should - * perform the following sequence for waiting: + * Wake-up the other end by writing a null byte in the pipe (non-blocking). + * Important note: Because writing into the pipe is non-blocking (and + * therefore we allow dropping wakeup data, as long as there is wakeup data + * present in the pipe buffer to wake up the other end), the other end + * should perform the following sequence for waiting: + * * 1) empty the pipe (reads). * 2) perform update operation. * 3) wait on the pipe (poll). @@ -433,6 +372,7 @@ end: ret = write(ctx->consumer_poll_pipe[1], "", 1); } while (ret < 0 && errno == EINTR); end_nosignal: + rcu_read_unlock(); return 0; } @@ -550,7 +490,7 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, * display the error but continue processing to try * to release the subbuffer */ - ERR("Error writing to tracefile"); + ERR("Error writing to tracefile (expected: %ld, got: %ld)", ret, len); } err = ustctl_put_next_subbuf(handle, buf); assert(err == 0);