X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=c7bc1e7a05b75010135d18cda82bf7fa322b1150;hb=4df6c8cbd1b0916288b64abdbdb1e83c3d5eae4f;hp=0005b1039cb27d6ced5dfac3537e72bbf737fce0;hpb=10a5031171c7bca5b4498c871b119e5a88b6a3fb;p=lttng-tools.git diff --git a/src/common/consumer.c b/src/common/consumer.c index 0005b1039..c7bc1e7a0 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -292,21 +292,22 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) pthread_mutex_lock(&consumer_data.lock); + /* Delete streams that might have been left in the stream list. */ + cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head, + send_node) { + cds_list_del(&stream->send_node); + /* + * Once a stream is added to this list, the buffers were created so + * we have a guarantee that this call will succeed. + */ + consumer_stream_destroy(stream, NULL); + } + switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: break; case LTTNG_CONSUMER32_UST: case LTTNG_CONSUMER64_UST: - /* Delete streams that might have been left in the stream list. */ - cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head, - send_node) { - cds_list_del(&stream->send_node); - /* - * Once a stream is added to this list, the buffers were created so - * we have a guarantee that this call will succeed. - */ - consumer_stream_destroy(stream, NULL); - } lttng_ustconsumer_del_channel(channel); break; default: @@ -464,7 +465,8 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, uint64_t session_id, int cpu, int *alloc_ret, - enum consumer_channel_type type) + enum consumer_channel_type type, + unsigned int monitor) { int ret; struct lttng_consumer_stream *stream; @@ -486,6 +488,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, stream->gid = gid; stream->net_seq_idx = relayd_id; stream->session_id = session_id; + stream->monitor = monitor; pthread_mutex_init(&stream->lock, NULL); /* If channel is the metadata, flag this stream as metadata. */ @@ -537,7 +540,6 @@ static int add_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht) { int ret = 0; - struct consumer_relayd_sock_pair *relayd; assert(stream); assert(ht); @@ -563,12 +565,6 @@ static int add_stream(struct lttng_consumer_stream *stream, */ lttng_ht_add_u64(consumer_data.stream_list_ht, &stream->node_session_id); - /* Check and cleanup relayd */ - relayd = consumer_find_relayd(stream->net_seq_idx); - if (relayd != NULL) { - uatomic_inc(&relayd->refcount); - } - /* * When nb_init_stream_left reaches 0, we don't need to trigger any action * in terms of destroying the associated channel, because the action that @@ -706,6 +702,7 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, goto end; } uatomic_inc(&relayd->refcount); + stream->sent_to_relayd = 1; } else { ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't send it.", stream->key, stream->net_seq_idx); @@ -1966,7 +1963,6 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht) { int ret = 0; - struct consumer_relayd_sock_pair *relayd; struct lttng_ht_iter iter; struct lttng_ht_node_u64 *node; @@ -1993,12 +1989,6 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream, node = lttng_ht_iter_get_node_u64(&iter); assert(!node); - /* Find relayd and, if one is found, increment refcount. */ - relayd = consumer_find_relayd(stream->net_seq_idx); - if (relayd != NULL) { - uatomic_inc(&relayd->refcount); - } - /* * When nb_init_stream_left reaches 0, we don't need to trigger any action * in terms of destroying the associated channel, because the action that @@ -2301,7 +2291,11 @@ void *consumer_thread_data_poll(void *data) goto end; } - local_stream = zmalloc(sizeof(struct lttng_consumer_stream)); + local_stream = zmalloc(sizeof(struct lttng_consumer_stream *)); + if (local_stream == NULL) { + PERROR("local_stream malloc"); + goto end; + } while (1) { high_prio = 0;