X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=07a68d8f755da7813d765357b5a2379f86b0b589;hb=173af62f4804133d4a7f45e34b6f72126f3eca5f;hp=f6add2c2c88327d05f1dd1ce90b49241df74c1f8;hpb=00e2e675d54dc726a7c8f8887c889cc8ef022003;p=lttng-tools.git diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index f6add2c2c..07a68d8f7 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -56,10 +56,15 @@ ssize_t lttng_ustconsumer_on_read_subbuffer_mmap( uint64_t metadata_id; struct consumer_relayd_sock_pair *relayd = NULL; + /* RCU lock for the relayd pointer */ + rcu_read_lock(); + /* Flag that the current stream if set for network streaming. */ if (stream->net_seq_idx != -1) { relayd = consumer_find_relayd(stream->net_seq_idx); if (relayd == NULL) { + ERR("UST consumer mmap(), unable to find relay for index %d", + stream->net_seq_idx); goto end; } } @@ -76,12 +81,15 @@ ssize_t lttng_ustconsumer_on_read_subbuffer_mmap( /* Handle stream on the relayd if the output is on the network */ if (relayd) { + unsigned long netlen = len; + if (stream->metadata_flag) { /* Only lock if metadata since we use the control socket. */ pthread_mutex_lock(&relayd->ctrl_sock_mutex); + netlen += sizeof(stream->relayd_stream_id); } - ret = consumer_handle_stream_before_relayd(stream, len); + ret = consumer_handle_stream_before_relayd(stream, netlen); if (ret >= 0) { outfd = ret; @@ -91,12 +99,12 @@ ssize_t lttng_ustconsumer_on_read_subbuffer_mmap( do { ret = write(outfd, (void *) &metadata_id, sizeof(stream->relayd_stream_id)); - if (ret < 0) { - PERROR("write metadata stream id"); - written = ret; - goto end; - } - } while (errno == EINTR); + } while (ret < 0 && errno == EINTR); + if (ret < 0) { + PERROR("write metadata stream id"); + written = ret; + goto end; + } DBG("Metadata stream id %zu written before data", stream->relayd_stream_id); } @@ -105,18 +113,15 @@ ssize_t lttng_ustconsumer_on_read_subbuffer_mmap( } while (len > 0) { - ret = write(outfd, stream->mmap_base + mmap_offset, len); + do { + ret = write(outfd, stream->mmap_base + mmap_offset, len); + } while (ret < 0 && errno == EINTR); if (ret < 0) { - if (errno == EINTR) { - /* restart the interrupted system call */ - continue; - } else { - PERROR("Error in file write"); - if (written == 0) { - written = ret; - } - goto end; + PERROR("Error in file write"); + if (written == 0) { + written = ret; } + goto end; } else if (ret > len) { PERROR("ret %ld > len %lu", ret, len); written += ret; @@ -142,6 +147,7 @@ end: if (relayd && stream->metadata_flag) { pthread_mutex_unlock(&relayd->ctrl_sock_mutex); } + rcu_read_unlock(); return written; } @@ -206,6 +212,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg)); if (ret != sizeof(msg)) { + DBG("Consumer received unexpected message size %zd (expects %zu)", + ret, sizeof(msg)); lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); return ret; } @@ -213,6 +221,9 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, return -ENOENT; } + /* relayd need RCU read-side lock */ + rcu_read_lock(); + switch (msg.cmd_type) { case LTTNG_CONSUMER_ADD_RELAYD_SOCKET: { @@ -298,6 +309,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, int fds[1]; size_t nb_fd = 1; + DBG("UST Consumer adding channel"); + /* block */ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { return -EINTR; @@ -337,6 +350,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, size_t nb_fd = 2; struct consumer_relayd_sock_pair *relayd = NULL; + DBG("UST Consumer adding stream"); + /* block */ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { return -EINTR; @@ -347,8 +362,12 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, return ret; } + DBG("consumer_add_stream chan %d stream %d", + msg.u.stream.channel_key, + msg.u.stream.stream_key); + assert(msg.u.stream.output == LTTNG_EVENT_MMAP); - new_stream = consumer_allocate_stream(msg.u.channel.channel_key, + new_stream = consumer_allocate_stream(msg.u.stream.channel_key, msg.u.stream.stream_key, fds[0], fds[1], msg.u.stream.state, @@ -399,6 +418,29 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, new_stream->relayd_stream_id); break; } + case LTTNG_CONSUMER_DESTROY_RELAYD: + { + struct consumer_relayd_sock_pair *relayd; + + DBG("UST consumer destroying relayd %zu", + msg.u.destroy_relayd.net_seq_idx); + + /* Get relayd reference if exists. */ + relayd = consumer_find_relayd(msg.u.destroy_relayd.net_seq_idx); + if (relayd == NULL) { + ERR("Unable to find relayd %zu", + msg.u.destroy_relayd.net_seq_idx); + } + + /* Set destroy flag for this object */ + uatomic_set(&relayd->destroy_flag, 1); + + /* Destroy the relayd if refcount is 0 else set the destroy flag. */ + if (uatomic_read(&relayd->refcount) == 0) { + consumer_destroy_relayd(relayd); + } + break; + } case LTTNG_CONSUMER_UPDATE_STREAM: { return -ENOSYS; @@ -434,8 +476,9 @@ end: */ do { ret = write(ctx->consumer_poll_pipe[1], "", 1); - } while (ret == -1UL && errno == EINTR); + } while (ret < 0 && errno == EINTR); end_nosignal: + rcu_read_unlock(); return 0; }