X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=c842de9358965c79e31f489b789906f3d1deddec;hb=53efb85a242809ed5ed21e9ab40effa696ecbc6f;hp=ffbecf7163ef394b4ffce3b8bf6874d3b5b4375f;hpb=994ab360b3264e19fdf590178601fa1f9f6489d0;p=lttng-tools.git diff --git a/src/common/consumer.c b/src/common/consumer.c index ffbecf716..c842de935 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -250,6 +250,32 @@ struct lttng_consumer_channel *consumer_find_channel(uint64_t key) return channel; } +/* + * There is a possibility that the consumer does not have enough time between + * the close of the channel on the session daemon and the cleanup in here thus + * once we have a channel add with an existing key, we know for sure that this + * channel will eventually get cleaned up by all streams being closed. + * + * This function just nullifies the already existing channel key. + */ +static void steal_channel_key(uint64_t key) +{ + struct lttng_consumer_channel *channel; + + rcu_read_lock(); + channel = consumer_find_channel(key); + if (channel) { + channel->key = (uint64_t) -1ULL; + /* + * We don't want the lookup to match, but we still need to iterate on + * this channel when iterating over the hash table. Just change the + * node key. + */ + channel->node.key = (uint64_t) -1ULL; + } + rcu_read_unlock(); +} + static void free_channel_rcu(struct rcu_head *head) { struct lttng_ht_node_u64 *node = @@ -979,43 +1005,35 @@ end: /* * Add a channel to the global list protected by a mutex. * - * On success 0 is returned else a negative value. + * Always return 0 indicating success. */ int consumer_add_channel(struct lttng_consumer_channel *channel, struct lttng_consumer_local_data *ctx) { - int ret = 0; - struct lttng_ht_node_u64 *node; - struct lttng_ht_iter iter; - pthread_mutex_lock(&consumer_data.lock); pthread_mutex_lock(&channel->lock); pthread_mutex_lock(&channel->timer_lock); - rcu_read_lock(); - lttng_ht_lookup(consumer_data.channel_ht, &channel->key, &iter); - node = lttng_ht_iter_get_node_u64(&iter); - if (node != NULL) { - /* Channel already exist. Ignore the insertion */ - ERR("Consumer add channel key %" PRIu64 " already exists!", - channel->key); - ret = -EEXIST; - goto end; - } + /* + * This gives us a guarantee that the channel we are about to add to the + * channel hash table will be unique. See this function comment on the why + * we need to steel the channel key at this stage. + */ + steal_channel_key(channel->key); + rcu_read_lock(); lttng_ht_add_unique_u64(consumer_data.channel_ht, &channel->node); - -end: rcu_read_unlock(); + pthread_mutex_unlock(&channel->timer_lock); pthread_mutex_unlock(&channel->lock); pthread_mutex_unlock(&consumer_data.lock); - if (!ret && channel->wait_fd != -1 && - channel->type == CONSUMER_CHANNEL_TYPE_DATA) { + if (channel->wait_fd != -1 && channel->type == CONSUMER_CHANNEL_TYPE_DATA) { notify_channel_pipe(ctx, channel, -1, CONSUMER_CHANNEL_ADD); } - return ret; + + return 0; } /* @@ -1683,7 +1701,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( if (stream->net_seq_idx != (uint64_t) -1ULL) { relayd = consumer_find_relayd(stream->net_seq_idx); if (relayd == NULL) { - ret = -EPIPE; + written = -ret; goto end; } } @@ -1701,7 +1719,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( /* Write metadata stream id before payload */ if (relayd) { - int total_len = len; + unsigned long total_len = len; if (stream->metadata_flag) { /* @@ -1714,31 +1732,21 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( padding); if (ret < 0) { written = ret; - /* Socket operation failed. We consider the relayd dead */ - if (ret == -EBADF) { - WARN("Remote relayd disconnected. Stopping"); - relayd_hang_up = 1; - goto write_error; - } - goto end; + relayd_hang_up = 1; + goto write_error; } total_len += sizeof(struct lttcomm_relayd_metadata_payload); } ret = write_relayd_stream_header(stream, total_len, padding, relayd); - if (ret >= 0) { - /* Use the returned socket. */ - outfd = ret; - } else { - /* Socket operation failed. We consider the relayd dead */ - if (ret == -EBADF) { - WARN("Remote relayd disconnected. Stopping"); - relayd_hang_up = 1; - goto write_error; - } - goto end; + if (ret < 0) { + written = ret; + relayd_hang_up = 1; + goto write_error; } + /* Use the returned socket. */ + outfd = ret; } else { /* No streaming, we have to set the len with the full padding */ len += padding; @@ -1755,6 +1763,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( stream->out_fd, &(stream->tracefile_count_current), &stream->out_fd); if (ret < 0) { + written = ret; ERR("Rotating output file"); goto end; } @@ -1766,6 +1775,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( stream->chan->tracefile_size, stream->tracefile_count_current); if (ret < 0) { + written = ret; goto end; } stream->index_fd = ret; @@ -1788,28 +1798,24 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( DBG("splice chan to pipe, ret %zd", ret_splice); if (ret_splice < 0) { ret = errno; - if (written == 0) { - written = ret_splice; - } + written = -ret; PERROR("Error in relay splice"); goto splice_error; } /* Handle stream on the relayd if the output is on the network */ - if (relayd) { - if (stream->metadata_flag) { - size_t metadata_payload_size = - sizeof(struct lttcomm_relayd_metadata_payload); + if (relayd && stream->metadata_flag) { + size_t metadata_payload_size = + sizeof(struct lttcomm_relayd_metadata_payload); - /* Update counter to fit the spliced data */ - ret_splice += metadata_payload_size; - len += metadata_payload_size; - /* - * We do this so the return value can match the len passed as - * argument to this function. - */ - written -= metadata_payload_size; - } + /* Update counter to fit the spliced data */ + ret_splice += metadata_payload_size; + len += metadata_payload_size; + /* + * We do this so the return value can match the len passed as + * argument to this function. + */ + written -= metadata_payload_size; } /* Splice data out */ @@ -1818,24 +1824,16 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( DBG("Consumer splice pipe to file, ret %zd", ret_splice); if (ret_splice < 0) { ret = errno; - if (written == 0) { - written = ret_splice; - } - /* Socket operation failed. We consider the relayd dead */ - if (errno == EBADF || errno == EPIPE || errno == ESPIPE) { - WARN("Remote relayd disconnected. Stopping"); - relayd_hang_up = 1; - goto write_error; - } - PERROR("Error in file splice"); - goto splice_error; + written = -ret; + relayd_hang_up = 1; + goto write_error; } else if (ret_splice > len) { /* * We don't expect this code path to be executed but you never know * so this is an extra protection agains a buggy splice(). */ - written += ret_splice; ret = errno; + written += ret_splice; PERROR("Wrote more data than requested %zd (len: %lu)", ret_splice, len); goto splice_error; @@ -3615,6 +3613,7 @@ int consumer_send_status_msg(int sock, int ret_code) { struct lttcomm_consumer_status_msg msg; + memset(&msg, 0, sizeof(msg)); msg.ret_code = ret_code; return lttcomm_send_unix_sock(sock, &msg, sizeof(msg)); @@ -3632,6 +3631,7 @@ int consumer_send_status_channel(int sock, assert(sock >= 0); + memset(&msg, 0, sizeof(msg)); if (!channel) { msg.ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL; } else {