X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=5a09ff51fb751069a3ad2fdbed17e698a0630dd0;hb=45893984238b2e2c12fc0d84b90336c98a6d98c9;hp=a6a4f1a91735911633a8d85d098e63cd01425417;hpb=d88aee689d5bd0067f362a323cb69c37717df59f;p=lttng-tools.git diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index a6a4f1a91..5a09ff51f 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -88,14 +88,14 @@ static int add_channel(struct lttng_consumer_channel *channel, if (ctx->on_recv_channel != NULL) { ret = ctx->on_recv_channel(channel); if (ret == 0) { - ret = consumer_add_channel(channel); + ret = consumer_add_channel(channel, ctx); } else if (ret < 0) { /* Most likely an ENOMEM. */ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); goto error; } } else { - ret = consumer_add_channel(channel); + ret = consumer_add_channel(channel, ctx); } DBG("UST consumer channel added (key: %" PRIu64 ")", channel->key); @@ -256,7 +256,7 @@ static int create_ust_streams(struct lttng_consumer_channel *channel, while ((ustream = ustctl_create_stream(channel->uchan, cpu))) { int wait_fd; - wait_fd = ustctl_get_wait_fd(ustream); + wait_fd = ustctl_stream_get_wait_fd(ustream); /* Allocate consumer stream object. */ stream = allocate_stream(cpu, wait_fd, channel, ctx, &ret); @@ -368,11 +368,6 @@ static int send_sessiond_stream(int sock, struct lttng_consumer_stream *stream) goto error; } - ret = ustctl_stream_close_wakeup_fd(stream->ustream); - if (ret < 0) { - goto error; - } - error: return ret; } @@ -401,6 +396,11 @@ static int send_sessiond_channel(int sock, goto error; } + ret = ustctl_channel_close_wakeup_fd(channel->uchan); + if (ret < 0) { + goto error; + } + /* The channel was sent successfully to the sessiond at this point. */ cds_list_for_each_entry(stream, &channel->streams.head, send_node) { /* Try to send the stream to the relayd if one is available. */ @@ -476,6 +476,12 @@ static int ask_channel(struct lttng_consumer_local_data *ctx, int sock, goto error; } + channel->wait_fd = ustctl_channel_get_wait_fd(channel->uchan); + + if (ret < 0) { + goto error; + } + /* Open all streams for this channel. */ ret = create_ust_streams(channel, ctx); if (ret < 0) { @@ -1277,3 +1283,13 @@ void lttng_ustconsumer_close_metadata(struct lttng_ht *metadata_ht) } rcu_read_unlock(); } + +void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream) +{ + int ret; + + ret = ustctl_stream_close_wakeup_fd(stream->ustream); + if (ret < 0) { + ERR("Unable to close wakeup fd"); + } +}