X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=ffbecf7163ef394b4ffce3b8bf6874d3b5b4375f;hb=994ab360b3264e19fdf590178601fa1f9f6489d0;hp=9d5a36970551fd19a5cb2288ca15026f1bf8dc70;hpb=6d574024f868e661ae688ecbc47a110a1311c57e;p=lttng-tools.git diff --git a/src/common/consumer.c b/src/common/consumer.c index 9d5a36970..ffbecf716 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -150,6 +150,31 @@ error: return (int) ret; } +/* + * Cleanup the stream list of a channel. Those streams are not yet globally + * visible + */ +static void clean_channel_stream_list(struct lttng_consumer_channel *channel) +{ + struct lttng_consumer_stream *stream, *stmp; + + assert(channel); + + /* Delete streams that might have been left in the stream list. */ + cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head, + send_node) { + cds_list_del(&stream->send_node); + /* + * Once a stream is added to this list, the buffers were created so we + * have a guarantee that this call will succeed. Setting the monitor + * mode to 0 so we don't lock nor try to delete the stream from the + * global hash table. + */ + stream->monitor = 0; + consumer_stream_destroy(stream, NULL); + } +} + /* * Find a stream. The consumer_data.lock must be locked during this * call. @@ -292,23 +317,14 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) { int ret; struct lttng_ht_iter iter; - struct lttng_consumer_stream *stream, *stmp; DBG("Consumer delete channel key %" PRIu64, channel->key); pthread_mutex_lock(&consumer_data.lock); pthread_mutex_lock(&channel->lock); - /* Delete streams that might have been left in the stream list. */ - cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head, - send_node) { - cds_list_del(&stream->send_node); - /* - * Once a stream is added to this list, the buffers were created so - * we have a guarantee that this call will succeed. - */ - consumer_stream_destroy(stream, NULL); - } + /* Destroy streams that might have been left in the stream list. */ + clean_channel_stream_list(channel); if (channel->live_timer_enabled == 1) { consumer_timer_live_stop(channel); @@ -1441,7 +1457,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( { unsigned long mmap_offset; void *mmap_base; - ssize_t ret = 0, written = 0; + ssize_t ret = 0; off_t orig_offset = stream->out_fd_offset; /* Default is on the disk */ int outfd = stream->out_fd; @@ -1465,9 +1481,9 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( case LTTNG_CONSUMER_KERNEL: mmap_base = stream->mmap_base; ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset); - if (ret != 0) { + if (ret < 0) { + ret = -errno; PERROR("tracer ctl get_mmap_read_offset"); - written = -errno; goto end; } break; @@ -1476,13 +1492,13 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( mmap_base = lttng_ustctl_get_mmap_base(stream); if (!mmap_base) { ERR("read mmap get mmap base for stream %s", stream->name); - written = -EPERM; + ret = -EPERM; goto end; } ret = lttng_ustctl_get_mmap_read_offset(stream, &mmap_offset); if (ret != 0) { PERROR("tracer ctl get_mmap_read_offset"); - written = ret; + ret = -EINVAL; goto end; } break; @@ -1506,30 +1522,20 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( } ret = write_relayd_stream_header(stream, netlen, padding, relayd); - if (ret >= 0) { - /* Use the returned socket. */ - outfd = ret; + if (ret < 0) { + relayd_hang_up = 1; + goto write_error; + } + /* Use the returned socket. */ + outfd = ret; - /* Write metadata stream id before payload */ - if (stream->metadata_flag) { - ret = write_relayd_metadata_id(outfd, stream, relayd, padding); - if (ret < 0) { - written = ret; - /* Socket operation failed. We consider the relayd dead */ - if (ret == -EPIPE || ret == -EINVAL) { - relayd_hang_up = 1; - goto write_error; - } - goto end; - } - } - } else { - /* Socket operation failed. We consider the relayd dead */ - if (ret == -EPIPE || ret == -EINVAL) { + /* Write metadata stream id before payload */ + if (stream->metadata_flag) { + ret = write_relayd_metadata_id(outfd, stream, relayd, padding); + if (ret < 0) { relayd_hang_up = 1; goto write_error; } - /* Else, use the default set before which is the filesystem. */ } } else { /* No streaming, we have to set the len with the full padding */ @@ -1586,13 +1592,12 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( * amount written. */ if (ret < 0) { - written = -errno; - } else { - written = ret; + ret = -errno; } + relayd_hang_up = 1; /* Socket operation failed. We consider the relayd dead */ - if (errno == EPIPE || errno == EINVAL) { + if (errno == EPIPE || errno == EINVAL || errno == EBADF) { /* * This is possible if the fd is closed on the other side * (outfd) or any write problem. It can be verbose a bit for a @@ -1600,16 +1605,13 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( * abruptly. This can happen so set this to a DBG statement. */ DBG("Consumer mmap write detected relayd hang up"); - relayd_hang_up = 1; - goto write_error; + } else { + /* Unhandled error, print it and stop function right now. */ + PERROR("Error in write mmap (ret %zd != len %lu)", ret, len); } - - /* Unhandled error, print it and stop function right now. */ - PERROR("Error in write mmap (ret %zd != len %lu)", ret, len); - goto end; + goto write_error; } stream->output_written += ret; - written = ret; /* This call is useless on a socket so better save a syscall. */ if (!relayd) { @@ -1636,7 +1638,7 @@ end: } rcu_read_unlock(); - return written; + return ret; } /* @@ -2652,12 +2654,17 @@ void consumer_close_channel_streams(struct lttng_consumer_channel *channel) break; case LTTNG_CONSUMER32_UST: case LTTNG_CONSUMER64_UST: - /* - * Note: a mutex is taken internally within - * liblttng-ust-ctl to protect timer wakeup_fd - * use from concurrent close. - */ - lttng_ustconsumer_close_stream_wakeup(stream); + if (stream->metadata_flag) { + /* Safe and protected by the stream lock. */ + lttng_ustconsumer_close_metadata(stream->chan); + } else { + /* + * Note: a mutex is taken internally within + * liblttng-ust-ctl to protect timer wakeup_fd + * use from concurrent close. + */ + lttng_ustconsumer_close_stream_wakeup(stream); + } break; default: ERR("Unknown consumer_data type"); @@ -2812,7 +2819,14 @@ restart: break; case CONSUMER_CHANNEL_DEL: { - struct lttng_consumer_stream *stream, *stmp; + /* + * This command should never be called if the channel + * has streams monitored by either the data or metadata + * thread. The consumer only notify this thread with a + * channel del. command if it receives a destroy + * channel command from the session daemon that send it + * if a command prior to the GET_CHANNEL failed. + */ rcu_read_lock(); chan = consumer_find_channel(key); @@ -2825,24 +2839,15 @@ restart: iter.iter.node = &chan->wait_fd_node.node; ret = lttng_ht_del(channel_ht, &iter); assert(ret == 0); - consumer_close_channel_streams(chan); switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: break; case LTTNG_CONSUMER32_UST: case LTTNG_CONSUMER64_UST: - /* Delete streams that might have been left in the stream list. */ - cds_list_for_each_entry_safe(stream, stmp, &chan->streams.head, - send_node) { - health_code_update(); - - cds_list_del(&stream->send_node); - lttng_ustconsumer_del_stream(stream); - uatomic_sub(&stream->chan->refcount, 1); - assert(&chan->refcount); - free(stream); - } + health_code_update(); + /* Destroy streams that might have been left in the stream list. */ + clean_channel_stream_list(chan); break; default: ERR("Unknown consumer_data type"); @@ -2895,6 +2900,12 @@ restart: lttng_poll_del(&events, chan->wait_fd); ret = lttng_ht_del(channel_ht, &iter); assert(ret == 0); + + /* + * This will close the wait fd for each stream associated to + * this channel AND monitored by the data/metadata thread thus + * will be clean by the right thread. + */ consumer_close_channel_streams(chan); /* Release our own refcount */