X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=0005b1039cb27d6ced5dfac3537e72bbf737fce0;hp=1f96fa9d460c5a97eac61a9c79acadbf239a7e91;hb=10a5031171c7bca5b4498c871b119e5a88b6a3fb;hpb=618a6a28c0956fc6829c165a39ffec97f239096c diff --git a/src/common/consumer.c b/src/common/consumer.c index 1f96fa9d4..0005b1039 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -301,8 +301,11 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head, send_node) { cds_list_del(&stream->send_node); - lttng_ustconsumer_del_stream(stream); - free(stream); + /* + * Once a stream is added to this list, the buffers were created so + * we have a guarantee that this call will succeed. + */ + consumer_stream_destroy(stream, NULL); } lttng_ustconsumer_del_channel(channel); break; @@ -312,25 +315,6 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) goto end; } - /* Empty no monitor streams list. */ - if (!channel->monitor) { - struct lttng_consumer_stream *stream, *stmp; - - /* - * So, these streams are not visible to any data thread. This is why we - * close and free them because they were never added to any data - * structure apart from this one. - */ - cds_list_for_each_entry_safe(stream, stmp, - &channel->stream_no_monitor_list.head, no_monitor_node) { - cds_list_del(&stream->no_monitor_node); - /* Close everything in that stream. */ - consumer_stream_close(stream); - /* Free the ressource. */ - consumer_stream_free(stream); - } - } - rcu_read_lock(); iter.iter.node = &channel->node.node; ret = lttng_ht_del(consumer_data.channel_ht, &iter); @@ -693,6 +677,66 @@ error: return relayd; } +/* + * Find a relayd and send the stream + * + * Returns 0 on success, < 0 on error + */ +int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, + char *path) +{ + int ret = 0; + struct consumer_relayd_sock_pair *relayd; + + assert(stream); + assert(stream->net_seq_idx != -1ULL); + assert(path); + + /* The stream is not metadata. Get relayd reference if exists. */ + rcu_read_lock(); + relayd = consumer_find_relayd(stream->net_seq_idx); + if (relayd != NULL) { + /* Add stream on the relayd */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_add_stream(&relayd->control_sock, stream->name, + path, &stream->relayd_stream_id, + stream->chan->tracefile_size, stream->chan->tracefile_count); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + if (ret < 0) { + goto end; + } + uatomic_inc(&relayd->refcount); + } else { + ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't send it.", + stream->key, stream->net_seq_idx); + ret = -1; + goto end; + } + + DBG("Stream %s with key %" PRIu64 " sent to relayd id %" PRIu64, + stream->name, stream->key, stream->net_seq_idx); + +end: + rcu_read_unlock(); + return ret; +} + +/* + * Find a relayd and close the stream + */ +void close_relayd_stream(struct lttng_consumer_stream *stream) +{ + struct consumer_relayd_sock_pair *relayd; + + /* The stream is not metadata. Get relayd reference if exists. */ + rcu_read_lock(); + relayd = consumer_find_relayd(stream->net_seq_idx); + if (relayd) { + consumer_stream_relayd_close(stream, relayd); + } + rcu_read_unlock(); +} + /* * Handle stream for relayd transmission if the stream applies for network * streaming where the net sequence index is set. @@ -816,7 +860,6 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, channel->wait_fd = -1; CDS_INIT_LIST_HEAD(&channel->streams.head); - CDS_INIT_LIST_HEAD(&channel->stream_no_monitor_list.head); DBG("Allocated channel (key %" PRIu64 ")", channel->key) @@ -856,7 +899,7 @@ end: pthread_mutex_unlock(&consumer_data.lock); if (!ret && channel->wait_fd != -1 && - channel->metadata_stream == NULL) { + channel->type == CONSUMER_CHANNEL_TYPE_DATA) { notify_channel_pipe(ctx, channel, -1, CONSUMER_CHANNEL_ADD); } return ret; @@ -2744,7 +2787,6 @@ restart: lttng_poll_del(&events, chan->wait_fd); ret = lttng_ht_del(channel_ht, &iter); assert(ret == 0); - assert(cds_list_empty(&chan->streams.head)); consumer_close_channel_streams(chan); /* Release our own refcount */